upgradeOutbound method

  1. @override
Future<Conn> upgradeOutbound({
  1. required TransportConn connection,
  2. required PeerId remotePeerId,
  3. required Config config,
  4. required MultiAddr remoteAddr,
})
override

Upgrades an outbound connection.

This method orchestrates the entire upgrade process:

  1. Negotiates and applies a security protocol.
  2. Negotiates and applies a stream multiplexer over the secured connection.

Returns a fully upgraded Conn ready for use by the swarm.

Parameters:

  • connection: The raw TransportConn to upgrade.
  • remotePeerId: The PeerId of the remote peer (if known, for outbound).
  • config: The node's Config containing security and muxer options.
  • remoteAddr: The remote peer's MultiAddr.

Implementation

@override
Future<Conn> upgradeOutbound({
  required TransportConn connection,
  required PeerId remotePeerId,
  required Config config,
  required MultiAddr remoteAddr,
}) async {
  try {
    final mssForSecurity = MultistreamMuxer();
    final securityProtoIDs = config.securityProtocols.map((s) => s.protocolId).toList();
    final negotiationSecStream = NegotiationStreamWrapper(connection, '/sec-negotiator');

    print("Going to try and upgrade to [${securityProtoIDs}]");
    final chosenSecurityIdStr = await mssForSecurity.selectOneOf(negotiationSecStream, securityProtoIDs);

    if (chosenSecurityIdStr == null) {
      await connection.close();
      throw Exception("Failed to negotiate security protocol with $remotePeerId at $remoteAddr");
    }
    final chosenSecurityId = chosenSecurityIdStr;

    final securityModule = config.securityProtocols.firstWhere(
      (s) => s.protocolId == chosenSecurityId,
      orElse: () => throw Exception("Selected security protocol $chosenSecurityId not found in config"),
    );
    final SecuredConnection securedConn = await securityModule.secureOutbound(connection);

    final mssForMuxers = MultistreamMuxer();
    final muxerProtoIDs = config.muxers.map((m) => m.id).toList();
    final negotiationMuxStream = NegotiationStreamWrapper(securedConn, '/mux-negotiator');
    final chosenMuxerIdStr = await mssForMuxers.selectOneOf(negotiationMuxStream, muxerProtoIDs);

    if (chosenMuxerIdStr == null) {
      await securedConn.close();
      throw Exception("Failed to negotiate stream multiplexer with ${securedConn.remotePeer} at $remoteAddr");
    }
    final chosenMuxerId = chosenMuxerIdStr;

    final muxerEntry = config.muxers.firstWhere(
      (m) => m.id == chosenMuxerId,
      orElse: () => throw Exception("Selected muxer protocol $chosenMuxerId not found in config"),
    );

    final p2p_mux.Multiplexer p2pMultiplexerInstance = muxerEntry.muxerFactory(
      securedConn,
      true, // isClient = true
    );

    final PeerScope peerScope = await resourceManager.viewPeer(
      securedConn.remotePeer,
      (ps) async => ps
    );

    final core_mux.MuxedConn muxedConnection = await p2pMultiplexerInstance.newConnOnTransport(
      securedConn,
      false, // isServer = false for outbound
      peerScope,
    );

    final PublicKey localPublicKey;
    if (config.peerKey != null) {
      localPublicKey = config.peerKey!.publicKey;
    } else {
      final tempKeyPair = await generateEd25519KeyPair();
      localPublicKey = tempKeyPair.publicKey;
    }
    final PeerId localPId = PeerId.fromPublicKey(localPublicKey);

    return UpgradedConnectionImpl(
      muxedConn: muxedConnection,
      securedConn: securedConn,
      negotiatedSecurityProto: chosenSecurityId,
      negotiatedMuxerProto: chosenMuxerId,
      localPeerId: localPId,
      remotePeerId: securedConn.remotePeer,
    );

  } catch (e) {
    await connection.close();
    rethrow;
  }
}