newStream method

  1. @override
Future<P2PStream> newStream(
  1. PeerId p,
  2. List<ProtocolID> pids,
  3. Context context
)
override

NewStream opens a new stream to given peer p, and writes a p2p/protocol header with given ProtocolID. If there is no connection to p, attempts to create one. If ProtocolID is "", writes no header. (Thread-safe)

If context is not provided, a new Context will be created.

Implementation

@override
Future<P2PStream> newStream(PeerId p, List<ProtocolID> pids, Context context) async {
  final startTime = DateTime.now();
  _log.warning('🎯 [newStream] ENTERED for peer ${p.toBase58()}, protocols: $pids');


  // Set up a timeout context if needed
  final hasTimeout = _negtimeout > Duration.zero;
  final deadline = hasTimeout ? DateTime.now().add(_negtimeout) : null;


  // Phase 1: Connection

  final connectStartTime = DateTime.now();
  _log.warning('🎯 [newStream Phase 1] Connecting to peer ${p.toBase58()}...');

  try {
    await connect(AddrInfo(p, []), context: context);
  } on IdentifyTimeoutException catch (e) {
    final totalTime = DateTime.now().difference(startTime);
    _log.warning('⏱️ [newStream Phase 1] Connection to ${p.toBase58()} timed out during identify after ${totalTime.inMilliseconds}ms');
    rethrow;
  } on IdentifyException catch (e) {
    final totalTime = DateTime.now().difference(startTime);
    _log.severe('❌ [newStream Phase 1] Connection to ${p.toBase58()} failed due to identify error after ${totalTime.inMilliseconds}ms: $e');
    rethrow;
  }
  _log.warning('✅ [newStream Phase 1] Connected to peer ${p.toBase58()}');

  final connectTime = DateTime.now().difference(connectStartTime);


  // Phase 2: Stream Creation

  final streamCreateStartTime = DateTime.now();
  _log.warning('🎯 [newStream Phase 2] Creating stream to peer ${p.toBase58()}...');

  final stream = await _network.newStream(context, p);
  _log.warning('✅ [newStream Phase 2] Stream ${stream.id()} created to peer ${p.toBase58()}');

  final streamCreateTime = DateTime.now().difference(streamCreateStartTime);


  // DEBUG: Add protocol assignment tracking


  // Phase 3: Identify Wait
  // CRITICAL FIX: Do NOT set stream deadline before identify wait
  // Identify has its own internal timeout handling (30s)
  // Setting deadline here causes it to expire during identify, breaking protocol negotiation

  final identifyStartTime = DateTime.now();

  // Skip identify for relay connections. Relay connections are already
  // Noise-authenticated and the identify exchange fails because the inbound
  // side's counter-identify DATA frames never reach the outbound side in time,
  // causing a 30s timeout that blocks all protocol negotiation.
  final connTransport = stream.conn.state.transport;
  final isRelayConn = connTransport == 'circuit-relay';
  if (isRelayConn) {
    _log.fine('[newStream Phase 3] Skipping identify for relay connection to ${p.toBase58()} on stream ${stream.id()}');
  } else {
    _log.warning('🎯 [newStream Phase 3] Waiting for identify on stream ${stream.id()}...');

    try {
      await _idService.identifyWait(stream.conn);
    } on IdentifyTimeoutException catch (e) {
      final totalTime = DateTime.now().difference(startTime);
      _log.warning('⏱️ [newStream Phase 3] Identify for ${p.toBase58()} timed out after ${totalTime.inMilliseconds}ms');
      await stream.reset();

      // CRITICAL: Remove the stale connection to prevent persistent failure loops.
      // The identify timeout indicates the connection is dead - keeping it would
      // cause repeated 30-second timeouts on subsequent operations.
      try {
        _log.warning('🗑️ [newStream Phase 3] Removing stale connection to ${p.toBase58()}');
        await _network.closePeer(p);
      } catch (closeError) {
        _log.warning('⚠️ [newStream Phase 3] Error closing stale connection: $closeError');
      }

      rethrow;
    } on IdentifyException catch (e) {
      final totalTime = DateTime.now().difference(startTime);
      _log.severe('❌ [newStream Phase 3] Identify for ${p.toBase58()} failed after ${totalTime.inMilliseconds}ms: $e');
      await stream.reset();
      rethrow;
    }
    _log.warning('✅ [newStream Phase 3] Identify complete for stream ${stream.id()}');
  }

  final identifyTime = DateTime.now().difference(identifyStartTime);


  // Phase 4: Protocol Negotiation
  // CRITICAL FIX: Set deadline AFTER identify completes to give protocol negotiation a fresh timeout window

  final negotiationStartTime = DateTime.now();

  try {
    // Set deadline NOW, after identify is complete
    // This gives protocol negotiation its own full timeout window
    if (hasTimeout && deadline != null) {
      final freshDeadline = DateTime.now().add(_negtimeout);
      stream.setDeadline(freshDeadline);
    }


    // DEBUG: Add detailed protocol negotiation tracking

    final selectStartTime = DateTime.now();
    _log.warning('🎯 [newStream Phase 4] Negotiating protocols $pids on stream ${stream.id()}...');

    final selectedProtocol = await _mux.selectOneOf(stream, pids);
    _log.warning('✅ [newStream Phase 4] Protocol negotiated: $selectedProtocol on stream ${stream.id()}');

    final selectTime = DateTime.now().difference(selectStartTime);


    // DEBUG: Add protocol selection result tracking


    if (hasTimeout) {

      stream.setDeadline(null); // Clear deadline after successful negotiation
    }

    if (selectedProtocol == null) {
      _log.severe('🤝 [NEWSTREAM-PHASE-4] No protocol selected from: $pids');
      stream.reset();
      throw Exception('Failed to negotiate any of the requested protocols: $pids with peer $p');
    }

    // Phase 5: Protocol Setup

    final setupStartTime = DateTime.now();
    _log.warning('🎯 [newStream Phase 5] Setting up protocol $selectedProtocol on stream ${stream.id()}...');

    // DEBUG: Add protocol assignment tracking

    await stream.setProtocol(selectedProtocol);

    // Ensure the stream's scope is also updated with the protocol.
    // This is crucial for services like Identify that attach to the scope.

    await stream.scope().setProtocol(selectedProtocol);

    // Add the successfully negotiated protocol to the peerstore for the remote peer.
    // Note: The go-libp2p implementation adds this *after* the stream handler returns,
    // but it seems more robust to add it as soon as negotiation succeeds.
    // This ensures that even if the handler has issues, we've recorded the protocol.
    peerStore.protoBook.addProtocols(p, [selectedProtocol]);

    final setupTime = DateTime.now().difference(setupStartTime);
    final negotiationTime = DateTime.now().difference(negotiationStartTime);
    final totalTime = DateTime.now().difference(startTime);

    _log.warning('✅ [newStream Phase 5] Protocol setup complete for stream ${stream.id()}');
    _log.warning('✅ [newStream] COMPLETE - Returning stream ${stream.id()} with protocol $selectedProtocol');




    // DEBUG: Add final protocol assignment confirmation


    return stream;

  } catch (e, stackTrace) {
    final negotiationTime = DateTime.now().difference(negotiationStartTime);
    final totalTime = DateTime.now().difference(startTime);
    _log.severe('❌ [NEWSTREAM-ERROR] Stream creation failed after ${totalTime.inMilliseconds}ms (negotiation: ${negotiationTime.inMilliseconds}ms): $e\n$stackTrace');

    try {
      stream.reset();

    } catch (resetError) {
      _log.warning('⚠️ [NEWSTREAM-ERROR] Error during stream reset: $resetError');
    }

    // No need to check for UnimplementedError specifically anymore
    throw Exception('Failed to negotiate protocol with $p for $pids: $e');
  }
}