openStream method
Creates a new stream.
Implementation
@override
Future<core_mux.MuxedStream> openStream(Context context) async {
_log.fine('$_logPrefix openStream: START for outgoing stream. Next Stream ID: $_nextStreamId. Context HashCode: ${context.hashCode}'); // Elevated log level
// This is the actual logic for creating an outbound stream for Yamux.
// The old newStream() (no-args) was essentially this.
if (_closed) {
_log.warning('$_logPrefix openStream called on closed session.');
throw StateError('Session is closed');
}
if (!canCreateStream) {
_log.warning('$_logPrefix newStream: Maximum streams reached ($maxStreams).');
throw StateError('Maximum streams reached');
}
final streamId = _nextStreamId;
_nextStreamId += 2;
// YAMUX PROTOCOL COMPLIANCE: Validate stream ID assignment follows Yamux specification
// Client should only create odd-numbered streams (1, 3, 5, ...)
// Server should only create even-numbered streams (2, 4, 6, ...)
if (_isClient && streamId % 2 == 0) {
_log.severe('$_logPrefix YAMUX PROTOCOL VIOLATION: Client attempted to create even stream ID: $streamId');
throw StateError('Yamux protocol violation: Client cannot create even stream ID: $streamId');
}
if (!_isClient && streamId % 2 == 1) {
_log.severe('$_logPrefix YAMUX PROTOCOL VIOLATION: Server attempted to create odd stream ID: $streamId');
throw StateError('Yamux protocol violation: Server cannot create odd stream ID: $streamId');
}
// DEBUG: Add detailed stream creation logging with caller info
final caller = StackTrace.current.toString().split('\n').length > 1
? StackTrace.current.toString().split('\n')[1].trim()
: 'unknown_caller';
_log.finer('$_logPrefix YamuxSession.newStream: Assigned streamId $streamId. Next will be $_nextStreamId.');
final stream = YamuxStream(
id: streamId,
protocol: '',
metadata: {},
initialWindowSize: _config.initialStreamWindowSize,
sendFrame: _sendFrame,
parentConn: this, // Added parentConn
logPrefix: "$_logPrefix StreamID=$streamId",
);
_streams[streamId] = stream;
// DEBUG: Add session-level stream tracking
_log.finer('$_logPrefix YamuxSession.newStream: YamuxStream for ID $streamId instantiated and added to _streams map.');
final completer = Completer<void>();
_pendingStreams[streamId] = completer;
_log.finer('$_logPrefix YamuxSession.newStream: Added pending stream completer for ID $streamId.');
try {
_log.fine('$_logPrefix openStream: Attempting to send SYN for new stream ID $streamId.'); // Elevated log level
final frame = YamuxFrame.newStream(streamId);
_log.fine('$_logPrefix openStream: Created Frame for SYN: Type=${frame.type}, StreamID=${frame.streamId}, Flags=${frame.flags}, Length=${frame.length}'); // ADDED LOG
await _sendFrame(frame);
_log.fine('$_logPrefix openStream: Successfully sent SYN for new stream ID $streamId.'); // Elevated log level
_log.fine('$_logPrefix openStream: Waiting for SYN-ACK for stream ID $streamId (timeout: ${_config.streamWriteTimeout}).'); // Elevated log level
await completer.future.timeout(_config.streamWriteTimeout);
_log.fine('$_logPrefix openStream: Received SYN-ACK for stream ID $streamId.'); // Elevated log level
_log.finer('$_logPrefix openStream: Calling stream.open() for ID $streamId.');
await stream.open();
_log.fine('$_logPrefix openStream: Outgoing YamuxStream ID $streamId opened locally.'); // Elevated log level
return stream;
} catch (e) {
_log.severe('$_logPrefix openStream: FAILED to open new stream ID $streamId: $e'); // More specific message
_pendingStreams.remove(streamId);
_streams.remove(streamId); // Ensure stream is removed if setup fails
_log.finer('$_logPrefix openStream: Removed stream ID $streamId from _pendingStreams and _streams due to error.');
// Don't await reset if stream.open() failed, as stream might not be in a state to send reset.
// Just ensure local cleanup of the stream object if it was partially initialized.
// If _sendFrame for SYN failed, stream.reset() would also fail.
// If timeout occurred, stream.reset() is appropriate.
if (e is TimeoutException) {
await stream.reset().catchError((resetError) {
_log.warning('$_logPrefix openStream: Error resetting stream ID $streamId during newStream timeout handling: $resetError');
});
} else {
// For other errors (e.g. _sendFrame failed), a simple local cleanup might be best.
await stream.forceReset().catchError((forceResetError) {
_log.warning('$_logPrefix openStream: Error force-resetting stream ID $streamId: $forceResetError');
});
}
rethrow;
}
}