newStream method

Future<P2PStream> newStream(
  1. Context context
)
override

Creates a new stream

Implementation

Future<P2PStream> newStream(Context context) async {
  _logger.fine('SwarmConn.newStream ($id): Entered to peer $remotePeer. Context HashCode: ${context.hashCode}');
  if (_isClosed) {
    _logger.fine('SwarmConn.newStream ($id): Connection is closed. Throwing exception.');
    throw Exception('Connection is closed');
  }
  _logger.fine('SwarmConn.newStream ($id): Connection is open.');
  _logger.fine('SwarmConn.newStream ($id): Type of this.conn (the UpgradedConnectionImpl): ${conn.runtimeType}');
  _logger.fine('SwarmConn.newStream ($id): About to call this.conn.newStream(context). This will call UpgradedConnectionImpl.newStream.');

  P2PStream underlyingMuxedStreamResult;
  try {
    // this.conn is an UpgradedConnectionImpl, which implements Conn.
    // Its newStream method will internally call _muxedConn.openStream(context).
    // We let Yamux manage its own stream IDs directly.
    underlyingMuxedStreamResult = await conn.newStream(context);
  } catch (e, st) {
    _logger.severe('SwarmConn.newStream ($id): Error calling this.conn.newStream(context): $e\n$st');

    // Record error for health tracking with context
    _healthMonitor.recordError(e, 'newStream');

    // Check if this is a "Session is closed" error, which indicates the underlying
    // multiplexer session has been closed but the SwarmConn hasn't been cleaned up yet
    final errorMessage = e.toString().toLowerCase();
    if (errorMessage.contains('session is closed') ||
        errorMessage.contains('closed session') ||
        errorMessage.contains('bad state: session is closed')) {
      _logger.warning('SwarmConn.newStream ($id): Detected closed session error. Marking connection as closed and notifying swarm.');

      // Mark this connection as closed
      _isClosed = true;

      // Notify the swarm to remove this stale connection
      // Use Future.microtask to avoid blocking the current operation
      Future.microtask(() async {
        try {
          await swarm.removeConnection(this);
          _logger.fine('SwarmConn.newStream ($id): Successfully removed stale connection from swarm.');
        } catch (removeError) {
          _logger.warning('SwarmConn.newStream ($id): Error removing stale connection from swarm: $removeError');
        }
      });

      // Rethrow with a more descriptive error message
      throw Exception('Connection to ${remotePeer} has a closed session and has been marked for cleanup. Please retry the operation.');
    }

    rethrow;
  }

  _logger.fine('SwarmConn.newStream ($id): Returned from this.conn.newStream(). Result type: ${underlyingMuxedStreamResult.runtimeType}, Stream ID: ${underlyingMuxedStreamResult.id()}');

  // Obtain a StreamManagementScope from the ResourceManager
  _logger.fine('SwarmConn.newStream ($id): Obtaining StreamManagementScope for SwarmStream using underlying muxed stream id: ${underlyingMuxedStreamResult.id()}.');
  final streamManagementScope = await swarm.resourceManager.openStream(
    remotePeer, // The peer this stream is being opened to
    Direction.outbound, // Streams created via newStream are outbound
  );

  // Use the underlying Yamux stream ID directly
  final yamuxStreamId = int.parse(underlyingMuxedStreamResult.id());

  // Create the SwarmStream wrapper
  final stream = SwarmStream(
    id: underlyingMuxedStreamResult.id(), // Use Yamux stream ID directly
    conn: this,
    direction: Direction.outbound,
    opened: DateTime.now(),
    underlyingMuxedStream: underlyingMuxedStreamResult as P2PStream<Uint8List>,
    managementScope: streamManagementScope,
  );

  // Add to our streams map using the Yamux stream ID
  await _streamsLock.synchronized(() async {
    _streams[yamuxStreamId] = stream;
  });

  // Record successful stream creation for health tracking
  _healthMonitor.recordSuccess('Stream created successfully');

  return stream;
}