upgradeInbound method

  1. @override
Future<Conn> upgradeInbound({
  1. required TransportConn connection,
  2. required Config config,
})
override

Upgrades an inbound connection.

This method orchestrates the entire upgrade process for an incoming connection:

  1. Negotiates and applies a security protocol.
  2. Negotiates and applies a stream multiplexer over the secured connection.

Returns a fully upgraded Conn ready for use by the swarm.

Parameters:

  • connection: The raw TransportConn to upgrade.
  • config: The node's Config containing security and muxer options.

Implementation

@override
Future<Conn> upgradeInbound({
  required TransportConn connection,
  required Config config,
}) async {
  try {
    // Extract or generate connection context
    ConnectionContext? connContext;
    if (connection is RelayedConn) {
      // For relay connections, use the existing context from RelayedConn
      connContext = connection.context;
    } else {
      // For direct connections, generate a new context
      // Note: We don't know the remote peer yet, will be set after security negotiation
      connContext = null; // Will be created after we know the remote peer
    }

    final mssForSecurity = MultistreamMuxer();
    final negotiationSecStream =
        NegotiationStreamWrapper(connection, '/sec-negotiator-in');

    final Completer<ProtocolID> securityProtoCompleter = Completer();
    if (config.securityProtocols.isEmpty) {
      await connection.close();
      throw Exception(
          "No security protocols configured for inbound connection");
    }
    for (final sp in config.securityProtocols) {
      mssForSecurity.addHandler(sp.protocolId,
          (ProtocolID p, P2PStream s) async {
        if (!securityProtoCompleter.isCompleted) {
          securityProtoCompleter.complete(p);
        }
      });
    }
    await mssForSecurity.handle(negotiationSecStream);
    final chosenSecurityId = await securityProtoCompleter.future;

    final securityModule = config.securityProtocols.firstWhere(
      (s) => s.protocolId == chosenSecurityId,
      orElse: () => throw Exception(
          "Client proposed security protocol $chosenSecurityId not found/supported"),
    );
    final SecuredConnection securedConn =
        await securityModule.secureInbound(connection);

    final mssForMuxers = MultistreamMuxer();
    final negotiationMuxStream =
        NegotiationStreamWrapper(securedConn, '/mux-negotiator-in');
    final Completer<ProtocolID> muxerProtoCompleter = Completer();
    if (config.muxers.isEmpty) {
      await securedConn.close();
      throw Exception("No muxers configured for inbound connection");
    }
    for (final m in config.muxers) {
      mssForMuxers.addHandler(m.id, (ProtocolID p, P2PStream s) async {
        if (!muxerProtoCompleter.isCompleted) {
          muxerProtoCompleter.complete(p);
        }
      });
    }

    await mssForMuxers.handle(negotiationMuxStream);
    final chosenMuxerId = await muxerProtoCompleter.future;

    final muxerEntry = config.muxers.firstWhere(
      (m) => m.id == chosenMuxerId,
      orElse: () => throw Exception(
          "Client proposed muxer $chosenMuxerId not found/supported"),
    );

    final p2p_mux.Multiplexer p2pMultiplexerInstance =
        muxerEntry.muxerFactory(
      securedConn,
      false, // isClient = false
    );

    final PeerScope peerScope = await resourceManager.viewPeer(
        securedConn.remotePeer, (ps) async => ps);

    final core_mux.MuxedConn muxedConnection =
        await p2pMultiplexerInstance.newConnOnTransport(
      securedConn,
      true, // isServer = true for inbound
      peerScope,
    );

    final PublicKey localPublicKey;
    if (config.peerKey != null) {
      localPublicKey = config.peerKey!.publicKey;
    } else {
      final tempKeyPair = await generateEd25519KeyPair();
      localPublicKey = tempKeyPair.publicKey;
    }
    final PeerId localPId = PeerId.fromPublicKey(localPublicKey);

    // Generate context for direct inbound connections if not already set
    if (connContext == null && connection is! RelayedConn) {
      connContext = ConnectionContext.direct(
        remotePeerId: securedConn.remotePeer.toBase58(),
        transportConnectionId: connection.id,
      );
    }

    return UpgradedConnectionImpl(
      muxedConn: muxedConnection,
      securedConn: securedConn,
      negotiatedSecurityProto: chosenSecurityId,
      negotiatedMuxerProto: chosenMuxerId,
      localPeerId: localPId,
      remotePeerId: securedConn.remotePeer,
      context: connContext,
    );
  } catch (e) {
    await connection.close();
    rethrow;
  }
}