newStream method

  1. @override
Future<P2PStream> newStream(
  1. PeerId p,
  2. List<ProtocolID> pids,
  3. Context context
)
override

NewStream opens a new stream to given peer p, and writes a p2p/protocol header with given ProtocolID. If there is no connection to p, attempts to create one. If ProtocolID is "", writes no header. (Thread-safe)

If context is not provided, a new Context will be created.

Implementation

@override
Future<P2PStream> newStream(
    PeerId p, List<ProtocolID> pids, Context context) async {
  final startTime = DateTime.now();
  _log.warning(
      '🎯 [newStream] ENTERED for peer ${p.toBase58()}, protocols: $pids');

  // Set up a timeout context if needed
  final hasTimeout = _negtimeout > Duration.zero;
  final deadline = hasTimeout ? DateTime.now().add(_negtimeout) : null;

  // Phase 1: Connection

  final connectStartTime = DateTime.now();
  _log.warning(
      '🎯 [newStream Phase 1] Connecting to peer ${p.toBase58()}...');

  try {
    await connect(AddrInfo(p, []), context: context);
  } on IdentifyTimeoutException {
    final totalTime = DateTime.now().difference(startTime);
    _log.warning(
        '⏱️ [newStream Phase 1] Connection to ${p.toBase58()} timed out during identify after ${totalTime.inMilliseconds}ms');
    rethrow;
  } on IdentifyException catch (e) {
    final totalTime = DateTime.now().difference(startTime);
    _log.severe(
        '❌ [newStream Phase 1] Connection to ${p.toBase58()} failed due to identify error after ${totalTime.inMilliseconds}ms: $e');
    rethrow;
  }
  _log.warning('✅ [newStream Phase 1] Connected to peer ${p.toBase58()}');

  final connectTime = DateTime.now().difference(connectStartTime);

  // Phase 2: Stream Creation

  final streamCreateStartTime = DateTime.now();
  _log.warning(
      '🎯 [newStream Phase 2] Creating stream to peer ${p.toBase58()}...');

  final stream = await _network.newStream(context, p);
  _log.warning(
      '✅ [newStream Phase 2] Stream ${stream.id()} created to peer ${p.toBase58()}');

  final streamCreateTime = DateTime.now().difference(streamCreateStartTime);

  // DEBUG: Add protocol assignment tracking

  // Phase 3: Identify Wait
  // CRITICAL FIX: Do NOT set stream deadline before identify wait
  // Identify has its own internal timeout handling (30s)
  // Setting deadline here causes it to expire during identify, breaking protocol negotiation

  final identifyStartTime = DateTime.now();
  _log.warning(
      '🎯 [newStream Phase 3] Waiting for identify on stream ${stream.id()}...');

  try {
    await _idService.identifyWait(stream.conn);
  } on IdentifyTimeoutException {
    final totalTime = DateTime.now().difference(startTime);
    _log.warning(
        '⏱️ [newStream Phase 3] Identify for ${p.toBase58()} timed out after ${totalTime.inMilliseconds}ms');
    await stream.reset();

    // CRITICAL: Remove the stale connection to prevent persistent failure loops.
    // The identify timeout indicates the connection is dead - keeping it would
    // cause repeated 30-second timeouts on subsequent operations.
    try {
      _log.warning(
          '🗑️ [newStream Phase 3] Removing stale connection to ${p.toBase58()}');
      await _network.closePeer(p);
    } catch (closeError) {
      _log.warning(
          '⚠️ [newStream Phase 3] Error closing stale connection: $closeError');
    }

    rethrow;
  } on IdentifyException catch (e) {
    final totalTime = DateTime.now().difference(startTime);
    _log.severe(
        '❌ [newStream Phase 3] Identify for ${p.toBase58()} failed after ${totalTime.inMilliseconds}ms: $e');
    await stream.reset();
    rethrow;
  }
  _log.warning(
      '✅ [newStream Phase 3] Identify complete for stream ${stream.id()}');

  final identifyTime = DateTime.now().difference(identifyStartTime);

  // Phase 4: Protocol Negotiation
  // CRITICAL FIX: Set deadline AFTER identify completes to give protocol negotiation a fresh timeout window

  final negotiationStartTime = DateTime.now();

  try {
    // Set deadline NOW, after identify is complete
    // This gives protocol negotiation its own full timeout window
    if (hasTimeout && deadline != null) {
      final freshDeadline = DateTime.now().add(_negtimeout);
      stream.setDeadline(freshDeadline);
    }

    // DEBUG: Add detailed protocol negotiation tracking

    final selectStartTime = DateTime.now();
    _log.warning(
        '🎯 [newStream Phase 4] Negotiating protocols $pids on stream ${stream.id()}...');

    final selectedProtocol = await _mux.selectOneOf(stream, pids);
    _log.warning(
        '✅ [newStream Phase 4] Protocol negotiated: $selectedProtocol on stream ${stream.id()}');

    final selectTime = DateTime.now().difference(selectStartTime);

    // DEBUG: Add protocol selection result tracking

    if (hasTimeout) {
      stream.setDeadline(null); // Clear deadline after successful negotiation
    }

    if (selectedProtocol == null) {
      _log.severe('🤝 [NEWSTREAM-PHASE-4] No protocol selected from: $pids');
      stream.reset();
      throw Exception(
          'Failed to negotiate any of the requested protocols: $pids with peer $p');
    }

    // Phase 5: Protocol Setup

    final setupStartTime = DateTime.now();
    _log.warning(
        '🎯 [newStream Phase 5] Setting up protocol $selectedProtocol on stream ${stream.id()}...');

    // DEBUG: Add protocol assignment tracking

    await stream.setProtocol(selectedProtocol);

    // Ensure the stream's scope is also updated with the protocol.
    // This is crucial for services like Identify that attach to the scope.

    await stream.scope().setProtocol(selectedProtocol);

    // Add the successfully negotiated protocol to the peerstore for the remote peer.
    // Note: The go-libp2p implementation adds this *after* the stream handler returns,
    // but it seems more robust to add it as soon as negotiation succeeds.
    // This ensures that even if the handler has issues, we've recorded the protocol.
    peerStore.protoBook.addProtocols(p, [selectedProtocol]);

    final setupTime = DateTime.now().difference(setupStartTime);
    final negotiationTime = DateTime.now().difference(negotiationStartTime);
    final totalTime = DateTime.now().difference(startTime);

    _log.warning(
        '✅ [newStream Phase 5] Protocol setup complete for stream ${stream.id()}');
    _log.warning(
        '✅ [newStream] COMPLETE - Returning stream ${stream.id()} with protocol $selectedProtocol');

    // DEBUG: Add final protocol assignment confirmation

    return stream;
  } catch (e, stackTrace) {
    final negotiationTime = DateTime.now().difference(negotiationStartTime);
    final totalTime = DateTime.now().difference(startTime);
    _log.severe(
        '❌ [NEWSTREAM-ERROR] Stream creation failed after ${totalTime.inMilliseconds}ms (negotiation: ${negotiationTime.inMilliseconds}ms): $e\n$stackTrace');

    try {
      stream.reset();
    } catch (resetError) {
      _log.warning(
          '⚠️ [NEWSTREAM-ERROR] Error during stream reset: $resetError');
    }

    // No need to check for UnimplementedError specifically anymore
    throw Exception('Failed to negotiate protocol with $p for $pids: $e');
  }
}