openStream method

  1. @override
Future<MuxedStream> openStream(
  1. Context context
)
override

Creates a new stream.

Implementation

@override
Future<core_mux.MuxedStream> openStream(Context context) async {
  _log.fine('$_logPrefix openStream: START for outgoing stream. Next Stream ID: $_nextStreamId. Context HashCode: ${context.hashCode}'); // Elevated log level
  // This is the actual logic for creating an outbound stream for Yamux.
  // The old newStream() (no-args) was essentially this.
  if (_closed) {
    _log.warning('$_logPrefix openStream called on closed session.');
    throw StateError('Session is closed');
  }

  if (!canCreateStream) {
    _log.warning('$_logPrefix newStream: Maximum streams reached ($maxStreams).');
    throw StateError('Maximum streams reached');
  }

  final streamId = _nextStreamId;
  _nextStreamId += 2;

  // YAMUX PROTOCOL COMPLIANCE: Validate stream ID assignment follows Yamux specification
  // Client should only create odd-numbered streams (1, 3, 5, ...)
  // Server should only create even-numbered streams (2, 4, 6, ...)
  if (_isClient && streamId % 2 == 0) {
    _log.severe('$_logPrefix YAMUX PROTOCOL VIOLATION: Client attempted to create even stream ID: $streamId');
    throw StateError('Yamux protocol violation: Client cannot create even stream ID: $streamId');
  }
  if (!_isClient && streamId % 2 == 1) {
    _log.severe('$_logPrefix YAMUX PROTOCOL VIOLATION: Server attempted to create odd stream ID: $streamId');
    throw StateError('Yamux protocol violation: Server cannot create odd stream ID: $streamId');
  }

  // DEBUG: Add detailed stream creation logging with caller info
  final caller = StackTrace.current.toString().split('\n').length > 1
      ? StackTrace.current.toString().split('\n')[1].trim()
      : 'unknown_caller';



  _log.finer('$_logPrefix YamuxSession.newStream: Assigned streamId $streamId. Next will be $_nextStreamId.');

  final stream = YamuxStream(
    id: streamId,
    protocol: '',
    metadata: {},
    initialWindowSize: _config.initialStreamWindowSize,
    sendFrame: _sendFrame,
    parentConn: this, // Added parentConn
    logPrefix: "$_logPrefix StreamID=$streamId",
  );

  _streams[streamId] = stream;

  // DEBUG: Add session-level stream tracking


  _log.finer('$_logPrefix YamuxSession.newStream: YamuxStream for ID $streamId instantiated and added to _streams map.');

  final completer = Completer<void>();
  _pendingStreams[streamId] = completer;
  _log.finer('$_logPrefix YamuxSession.newStream: Added pending stream completer for ID $streamId.');

  try {
    _log.fine('$_logPrefix openStream: Attempting to send SYN for new stream ID $streamId.'); // Elevated log level
    final frame = YamuxFrame.newStream(streamId);
    _log.fine('$_logPrefix openStream: Created Frame for SYN: Type=${frame.type}, StreamID=${frame.streamId}, Flags=${frame.flags}, Length=${frame.length}'); // ADDED LOG
    await _sendFrame(frame);
    _log.fine('$_logPrefix openStream: Successfully sent SYN for new stream ID $streamId.'); // Elevated log level

    _log.fine('$_logPrefix openStream: Waiting for SYN-ACK for stream ID $streamId (timeout: ${_config.streamWriteTimeout}).'); // Elevated log level
    await completer.future.timeout(_config.streamWriteTimeout);
    _log.fine('$_logPrefix openStream: Received SYN-ACK for stream ID $streamId.'); // Elevated log level

    _log.finer('$_logPrefix openStream: Calling stream.open() for ID $streamId.');
    await stream.open();
    _log.fine('$_logPrefix openStream: Outgoing YamuxStream ID $streamId opened locally.'); // Elevated log level
    return stream;
  } catch (e) {
    _log.severe('$_logPrefix openStream: FAILED to open new stream ID $streamId: $e'); // More specific message
    _pendingStreams.remove(streamId);
    _streams.remove(streamId); // Ensure stream is removed if setup fails
    _log.finer('$_logPrefix openStream: Removed stream ID $streamId from _pendingStreams and _streams due to error.');
    // Don't await reset if stream.open() failed, as stream might not be in a state to send reset.
    // Just ensure local cleanup of the stream object if it was partially initialized.
    // If _sendFrame for SYN failed, stream.reset() would also fail.
    // If timeout occurred, stream.reset() is appropriate.
    if (e is TimeoutException) {
      await stream.reset().catchError((resetError) {
        _log.warning('$_logPrefix openStream: Error resetting stream ID $streamId during newStream timeout handling: $resetError');
      });
    } else {
      // For other errors (e.g. _sendFrame failed), a simple local cleanup might be best.
      await stream.forceReset().catchError((forceResetError) {
           _log.warning('$_logPrefix openStream: Error force-resetting stream ID $streamId: $forceResetError');
      });
    }
    rethrow;
  }
}