newStream method
NewStream opens a new stream to given peer p, and writes a p2p/protocol header with given ProtocolID. If there is no connection to p, attempts to create one. If ProtocolID is "", writes no header. (Thread-safe)
If context is not provided, a new Context will be created.
Implementation
@override
Future<P2PStream> newStream(PeerId p, List<ProtocolID> pids, Context context) async {
final startTime = DateTime.now();
_log.warning('🎯 [newStream] ENTERED for peer ${p.toBase58()}, protocols: $pids');
// Set up a timeout context if needed
final hasTimeout = _negtimeout > Duration.zero;
final deadline = hasTimeout ? DateTime.now().add(_negtimeout) : null;
// Phase 1: Connection
final connectStartTime = DateTime.now();
_log.warning('🎯 [newStream Phase 1] Connecting to peer ${p.toBase58()}...');
try {
await connect(AddrInfo(p, []), context: context);
} on IdentifyTimeoutException catch (e) {
final totalTime = DateTime.now().difference(startTime);
_log.warning('⏱️ [newStream Phase 1] Connection to ${p.toBase58()} timed out during identify after ${totalTime.inMilliseconds}ms');
rethrow;
} on IdentifyException catch (e) {
final totalTime = DateTime.now().difference(startTime);
_log.severe('❌ [newStream Phase 1] Connection to ${p.toBase58()} failed due to identify error after ${totalTime.inMilliseconds}ms: $e');
rethrow;
}
_log.warning('✅ [newStream Phase 1] Connected to peer ${p.toBase58()}');
final connectTime = DateTime.now().difference(connectStartTime);
// Phase 2: Stream Creation
final streamCreateStartTime = DateTime.now();
_log.warning('🎯 [newStream Phase 2] Creating stream to peer ${p.toBase58()}...');
final stream = await _network.newStream(context, p);
_log.warning('✅ [newStream Phase 2] Stream ${stream.id()} created to peer ${p.toBase58()}');
final streamCreateTime = DateTime.now().difference(streamCreateStartTime);
// DEBUG: Add protocol assignment tracking
// Phase 3: Identify Wait
// CRITICAL FIX: Do NOT set stream deadline before identify wait
// Identify has its own internal timeout handling (30s)
// Setting deadline here causes it to expire during identify, breaking protocol negotiation
final identifyStartTime = DateTime.now();
// Skip identify for relay connections. Relay connections are already
// Noise-authenticated and the identify exchange fails because the inbound
// side's counter-identify DATA frames never reach the outbound side in time,
// causing a 30s timeout that blocks all protocol negotiation.
final connTransport = stream.conn.state.transport;
final isRelayConn = connTransport == 'circuit-relay';
if (isRelayConn) {
_log.fine('[newStream Phase 3] Skipping identify for relay connection to ${p.toBase58()} on stream ${stream.id()}');
} else {
_log.warning('🎯 [newStream Phase 3] Waiting for identify on stream ${stream.id()}...');
try {
await _idService.identifyWait(stream.conn);
} on IdentifyTimeoutException catch (e) {
final totalTime = DateTime.now().difference(startTime);
_log.warning('⏱️ [newStream Phase 3] Identify for ${p.toBase58()} timed out after ${totalTime.inMilliseconds}ms');
await stream.reset();
// CRITICAL: Remove the stale connection to prevent persistent failure loops.
// The identify timeout indicates the connection is dead - keeping it would
// cause repeated 30-second timeouts on subsequent operations.
try {
_log.warning('🗑️ [newStream Phase 3] Removing stale connection to ${p.toBase58()}');
await _network.closePeer(p);
} catch (closeError) {
_log.warning('⚠️ [newStream Phase 3] Error closing stale connection: $closeError');
}
rethrow;
} on IdentifyException catch (e) {
final totalTime = DateTime.now().difference(startTime);
_log.severe('❌ [newStream Phase 3] Identify for ${p.toBase58()} failed after ${totalTime.inMilliseconds}ms: $e');
await stream.reset();
rethrow;
}
_log.warning('✅ [newStream Phase 3] Identify complete for stream ${stream.id()}');
}
final identifyTime = DateTime.now().difference(identifyStartTime);
// Phase 4: Protocol Negotiation
// CRITICAL FIX: Set deadline AFTER identify completes to give protocol negotiation a fresh timeout window
final negotiationStartTime = DateTime.now();
try {
// Set deadline NOW, after identify is complete
// This gives protocol negotiation its own full timeout window
if (hasTimeout && deadline != null) {
final freshDeadline = DateTime.now().add(_negtimeout);
stream.setDeadline(freshDeadline);
}
// DEBUG: Add detailed protocol negotiation tracking
final selectStartTime = DateTime.now();
_log.warning('🎯 [newStream Phase 4] Negotiating protocols $pids on stream ${stream.id()}...');
final selectedProtocol = await _mux.selectOneOf(stream, pids);
_log.warning('✅ [newStream Phase 4] Protocol negotiated: $selectedProtocol on stream ${stream.id()}');
final selectTime = DateTime.now().difference(selectStartTime);
// DEBUG: Add protocol selection result tracking
if (hasTimeout) {
stream.setDeadline(null); // Clear deadline after successful negotiation
}
if (selectedProtocol == null) {
_log.severe('🤝 [NEWSTREAM-PHASE-4] No protocol selected from: $pids');
stream.reset();
throw Exception('Failed to negotiate any of the requested protocols: $pids with peer $p');
}
// Phase 5: Protocol Setup
final setupStartTime = DateTime.now();
_log.warning('🎯 [newStream Phase 5] Setting up protocol $selectedProtocol on stream ${stream.id()}...');
// DEBUG: Add protocol assignment tracking
await stream.setProtocol(selectedProtocol);
// Ensure the stream's scope is also updated with the protocol.
// This is crucial for services like Identify that attach to the scope.
await stream.scope().setProtocol(selectedProtocol);
// Add the successfully negotiated protocol to the peerstore for the remote peer.
// Note: The go-libp2p implementation adds this *after* the stream handler returns,
// but it seems more robust to add it as soon as negotiation succeeds.
// This ensures that even if the handler has issues, we've recorded the protocol.
peerStore.protoBook.addProtocols(p, [selectedProtocol]);
final setupTime = DateTime.now().difference(setupStartTime);
final negotiationTime = DateTime.now().difference(negotiationStartTime);
final totalTime = DateTime.now().difference(startTime);
_log.warning('✅ [newStream Phase 5] Protocol setup complete for stream ${stream.id()}');
_log.warning('✅ [newStream] COMPLETE - Returning stream ${stream.id()} with protocol $selectedProtocol');
// DEBUG: Add final protocol assignment confirmation
return stream;
} catch (e, stackTrace) {
final negotiationTime = DateTime.now().difference(negotiationStartTime);
final totalTime = DateTime.now().difference(startTime);
_log.severe('❌ [NEWSTREAM-ERROR] Stream creation failed after ${totalTime.inMilliseconds}ms (negotiation: ${negotiationTime.inMilliseconds}ms): $e\n$stackTrace');
try {
stream.reset();
} catch (resetError) {
_log.warning('⚠️ [NEWSTREAM-ERROR] Error during stream reset: $resetError');
}
// No need to check for UnimplementedError specifically anymore
throw Exception('Failed to negotiate protocol with $p for $pids: $e');
}
}