upgradeInbound method

  1. @override
Future<Conn> upgradeInbound({
  1. required TransportConn connection,
  2. required Config config,
})
override

Upgrades an inbound connection.

This method orchestrates the entire upgrade process for an incoming connection:

  1. Negotiates and applies a security protocol.
  2. Negotiates and applies a stream multiplexer over the secured connection.

Returns a fully upgraded Conn ready for use by the swarm.

Parameters:

  • connection: The raw TransportConn to upgrade.
  • config: The node's Config containing security and muxer options.

Implementation

@override
Future<Conn> upgradeInbound({
  required TransportConn connection,
  required Config config,
}) async {
  try {
    final mssForSecurity = MultistreamMuxer();
    final negotiationSecStream = NegotiationStreamWrapper(connection, '/sec-negotiator-in');

    final Completer<ProtocolID> securityProtoCompleter = Completer();
    if (config.securityProtocols.isEmpty) {
      await connection.close();
      throw Exception("No security protocols configured for inbound connection");
    }
    for (final sp in config.securityProtocols) {
      mssForSecurity.addHandler(sp.protocolId, (ProtocolID p, P2PStream s) async {
        if (!securityProtoCompleter.isCompleted) {
          securityProtoCompleter.complete(p);
        }
      });
    }
    await mssForSecurity.handle(negotiationSecStream);
    final chosenSecurityId = await securityProtoCompleter.future;

    final securityModule = config.securityProtocols.firstWhere(
      (s) => s.protocolId == chosenSecurityId,
      orElse: () => throw Exception("Client proposed security protocol $chosenSecurityId not found/supported"),
    );
    final SecuredConnection securedConn = await securityModule.secureInbound(connection);

    final mssForMuxers = MultistreamMuxer();
    final negotiationMuxStream = NegotiationStreamWrapper(securedConn, '/mux-negotiator-in');
    final Completer<ProtocolID> muxerProtoCompleter = Completer();
    if (config.muxers.isEmpty) {
      await securedConn.close();
      throw Exception("No muxers configured for inbound connection");
    }
    for (final m in config.muxers) {
      mssForMuxers.addHandler(m.id, (ProtocolID p, P2PStream s) async {
        if (!muxerProtoCompleter.isCompleted) {
          muxerProtoCompleter.complete(p);
        }
      });
    }

    await mssForMuxers.handle(negotiationMuxStream);
    final chosenMuxerId = await muxerProtoCompleter.future;

    final muxerEntry = config.muxers.firstWhere(
      (m) => m.id == chosenMuxerId,
      orElse: () => throw Exception("Client proposed muxer $chosenMuxerId not found/supported"),
    );

    final p2p_mux.Multiplexer p2pMultiplexerInstance = muxerEntry.muxerFactory(
      securedConn,
      false, // isClient = false
    );

    final PeerScope peerScope = await resourceManager.viewPeer(
      securedConn.remotePeer,
      (ps) async => ps
    );

    final core_mux.MuxedConn muxedConnection = await p2pMultiplexerInstance.newConnOnTransport(
      securedConn,
      true,  // isServer = true for inbound
      peerScope,
    );

    final PublicKey localPublicKey;
    if (config.peerKey != null) {
      localPublicKey = config.peerKey!.publicKey;
    } else {
      final tempKeyPair = await generateEd25519KeyPair();
      localPublicKey = tempKeyPair.publicKey;
    }
    final PeerId localPId = PeerId.fromPublicKey(localPublicKey);

    return UpgradedConnectionImpl(
      muxedConn: muxedConnection,
      securedConn: securedConn,
      negotiatedSecurityProto: chosenSecurityId,
      negotiatedMuxerProto: chosenMuxerId,
      localPeerId: localPId,
      remotePeerId: securedConn.remotePeer,
    );

  } catch (e) {
    await connection.close();
    rethrow;
  }
}