newStream method
Creates a new stream
Implementation
Future<P2PStream> newStream(Context context) async {
_logger.fine('SwarmConn.newStream ($id): Entered to peer $remotePeer. Context HashCode: ${context.hashCode}');
if (_isClosed) {
_logger.fine('SwarmConn.newStream ($id): Connection is closed. Throwing exception.');
throw Exception('Connection is closed');
}
_logger.fine('SwarmConn.newStream ($id): Connection is open.');
_logger.fine('SwarmConn.newStream ($id): Type of this.conn (the UpgradedConnectionImpl): ${conn.runtimeType}');
_logger.fine('SwarmConn.newStream ($id): About to call this.conn.newStream(context). This will call UpgradedConnectionImpl.newStream.');
P2PStream underlyingMuxedStreamResult;
try {
// this.conn is an UpgradedConnectionImpl, which implements Conn.
// Its newStream method will internally call _muxedConn.openStream(context).
// We let Yamux manage its own stream IDs directly.
underlyingMuxedStreamResult = await conn.newStream(context);
} catch (e, st) {
_logger.severe('SwarmConn.newStream ($id): Error calling this.conn.newStream(context): $e\n$st');
// Record error for health tracking with context
_healthMonitor.recordError(e, 'newStream');
// Check if this is a "Session is closed" error, which indicates the underlying
// multiplexer session has been closed but the SwarmConn hasn't been cleaned up yet
final errorMessage = e.toString().toLowerCase();
if (errorMessage.contains('session is closed') ||
errorMessage.contains('closed session') ||
errorMessage.contains('bad state: session is closed')) {
_logger.warning('SwarmConn.newStream ($id): Detected closed session error. Marking connection as closed and notifying swarm.');
// Mark this connection as closed
_isClosed = true;
// Notify the swarm to remove this stale connection
// Use Future.microtask to avoid blocking the current operation
Future.microtask(() async {
try {
await swarm.removeConnection(this);
_logger.fine('SwarmConn.newStream ($id): Successfully removed stale connection from swarm.');
} catch (removeError) {
_logger.warning('SwarmConn.newStream ($id): Error removing stale connection from swarm: $removeError');
}
});
// Rethrow with a more descriptive error message
throw Exception('Connection to ${remotePeer} has a closed session and has been marked for cleanup. Please retry the operation.');
}
rethrow;
}
_logger.fine('SwarmConn.newStream ($id): Returned from this.conn.newStream(). Result type: ${underlyingMuxedStreamResult.runtimeType}, Stream ID: ${underlyingMuxedStreamResult.id()}');
// Obtain a StreamManagementScope from the ResourceManager
_logger.fine('SwarmConn.newStream ($id): Obtaining StreamManagementScope for SwarmStream using underlying muxed stream id: ${underlyingMuxedStreamResult.id()}.');
final streamManagementScope = await swarm.resourceManager.openStream(
remotePeer, // The peer this stream is being opened to
Direction.outbound, // Streams created via newStream are outbound
);
// Use the underlying Yamux stream ID directly
final yamuxStreamId = int.parse(underlyingMuxedStreamResult.id());
// Create the SwarmStream wrapper
final stream = SwarmStream(
id: underlyingMuxedStreamResult.id(), // Use Yamux stream ID directly
conn: this,
direction: Direction.outbound,
opened: DateTime.now(),
underlyingMuxedStream: underlyingMuxedStreamResult as P2PStream<Uint8List>,
managementScope: streamManagementScope,
);
// Add to our streams map using the Yamux stream ID
await _streamsLock.synchronized(() async {
_streams[yamuxStreamId] = stream;
});
// Record successful stream creation for health tracking
_healthMonitor.recordSuccess('Stream created successfully');
return stream;
}