upgradeOutbound method

  1. @override
Future<Conn> upgradeOutbound({
  1. required TransportConn connection,
  2. required PeerId remotePeerId,
  3. required Config config,
  4. required MultiAddr remoteAddr,
})
override

Upgrades an outbound connection.

This method orchestrates the entire upgrade process:

  1. Negotiates and applies a security protocol.
  2. Negotiates and applies a stream multiplexer over the secured connection.

Returns a fully upgraded Conn ready for use by the swarm.

Parameters:

  • connection: The raw TransportConn to upgrade.
  • remotePeerId: The PeerId of the remote peer (if known, for outbound).
  • config: The node's Config containing security and muxer options.
  • remoteAddr: The remote peer's MultiAddr.

Implementation

@override
Future<Conn> upgradeOutbound({
  required TransportConn connection,
  required PeerId remotePeerId,
  required Config config,
  required MultiAddr remoteAddr,
}) async {
  try {
    // Extract or generate connection context
    ConnectionContext? connContext;
    if (connection is RelayedConn) {
      // For relay connections, use the existing context from RelayedConn
      connContext = connection.context;
    } else {
      // For direct connections, generate a new context
      connContext = ConnectionContext.direct(
        remotePeerId: remotePeerId.toBase58(),
        transportConnectionId: connection.id,
      );
    }

    final mssForSecurity = MultistreamMuxer();
    final securityProtoIDs =
        config.securityProtocols.map((s) => s.protocolId).toList();
    final negotiationSecStream =
        NegotiationStreamWrapper(connection, '/sec-negotiator');

    final chosenSecurityIdStr = await mssForSecurity.selectOneOf(
        negotiationSecStream, securityProtoIDs);

    if (chosenSecurityIdStr == null) {
      await connection.close();
      throw Exception(
          "Failed to negotiate security protocol with $remotePeerId at $remoteAddr");
    }
    final chosenSecurityId = chosenSecurityIdStr;

    final securityModule = config.securityProtocols.firstWhere(
      (s) => s.protocolId == chosenSecurityId,
      orElse: () => throw Exception(
          "Selected security protocol $chosenSecurityId not found in config"),
    );
    final SecuredConnection securedConn =
        await securityModule.secureOutbound(connection);

    final mssForMuxers = MultistreamMuxer();
    final muxerProtoIDs = config.muxers.map((m) => m.id).toList();
    final negotiationMuxStream =
        NegotiationStreamWrapper(securedConn, '/mux-negotiator');
    final chosenMuxerIdStr =
        await mssForMuxers.selectOneOf(negotiationMuxStream, muxerProtoIDs);

    if (chosenMuxerIdStr == null) {
      await securedConn.close();
      throw Exception(
          "Failed to negotiate stream multiplexer with ${securedConn.remotePeer} at $remoteAddr");
    }
    final chosenMuxerId = chosenMuxerIdStr;

    final muxerEntry = config.muxers.firstWhere(
      (m) => m.id == chosenMuxerId,
      orElse: () => throw Exception(
          "Selected muxer protocol $chosenMuxerId not found in config"),
    );

    final p2p_mux.Multiplexer p2pMultiplexerInstance =
        muxerEntry.muxerFactory(
      securedConn,
      true, // isClient = true
    );

    final PeerScope peerScope = await resourceManager.viewPeer(
        securedConn.remotePeer, (ps) async => ps);

    final core_mux.MuxedConn muxedConnection =
        await p2pMultiplexerInstance.newConnOnTransport(
      securedConn,
      false, // isServer = false for outbound
      peerScope,
    );

    final PublicKey localPublicKey;
    if (config.peerKey != null) {
      localPublicKey = config.peerKey!.publicKey;
    } else {
      final tempKeyPair = await generateEd25519KeyPair();
      localPublicKey = tempKeyPair.publicKey;
    }
    final PeerId localPId = PeerId.fromPublicKey(localPublicKey);

    return UpgradedConnectionImpl(
      muxedConn: muxedConnection,
      securedConn: securedConn,
      negotiatedSecurityProto: chosenSecurityId,
      negotiatedMuxerProto: chosenMuxerId,
      localPeerId: localPId,
      remotePeerId: securedConn.remotePeer,
      context: connContext,
    );
  } catch (e) {
    await connection.close();
    rethrow;
  }
}