upgradeOutbound method
Future<Conn>
upgradeOutbound({
- required TransportConn connection,
- required PeerId remotePeerId,
- required Config config,
- required MultiAddr remoteAddr,
override
Upgrades an outbound connection.
This method orchestrates the entire upgrade process:
- Negotiates and applies a security protocol.
- Negotiates and applies a stream multiplexer over the secured connection.
Returns a fully upgraded Conn ready for use by the swarm.
Parameters:
connection
: The raw TransportConn to upgrade.remotePeerId
: The PeerId of the remote peer (if known, for outbound).config
: The node's Config containing security and muxer options.remoteAddr
: The remote peer's MultiAddr.
Implementation
@override
Future<Conn> upgradeOutbound({
required TransportConn connection,
required PeerId remotePeerId,
required Config config,
required MultiAddr remoteAddr,
}) async {
try {
final mssForSecurity = MultistreamMuxer();
final securityProtoIDs = config.securityProtocols.map((s) => s.protocolId).toList();
final negotiationSecStream = NegotiationStreamWrapper(connection, '/sec-negotiator');
print("Going to try and upgrade to [${securityProtoIDs}]");
final chosenSecurityIdStr = await mssForSecurity.selectOneOf(negotiationSecStream, securityProtoIDs);
if (chosenSecurityIdStr == null) {
await connection.close();
throw Exception("Failed to negotiate security protocol with $remotePeerId at $remoteAddr");
}
final chosenSecurityId = chosenSecurityIdStr;
final securityModule = config.securityProtocols.firstWhere(
(s) => s.protocolId == chosenSecurityId,
orElse: () => throw Exception("Selected security protocol $chosenSecurityId not found in config"),
);
final SecuredConnection securedConn = await securityModule.secureOutbound(connection);
final mssForMuxers = MultistreamMuxer();
final muxerProtoIDs = config.muxers.map((m) => m.id).toList();
final negotiationMuxStream = NegotiationStreamWrapper(securedConn, '/mux-negotiator');
final chosenMuxerIdStr = await mssForMuxers.selectOneOf(negotiationMuxStream, muxerProtoIDs);
if (chosenMuxerIdStr == null) {
await securedConn.close();
throw Exception("Failed to negotiate stream multiplexer with ${securedConn.remotePeer} at $remoteAddr");
}
final chosenMuxerId = chosenMuxerIdStr;
final muxerEntry = config.muxers.firstWhere(
(m) => m.id == chosenMuxerId,
orElse: () => throw Exception("Selected muxer protocol $chosenMuxerId not found in config"),
);
final p2p_mux.Multiplexer p2pMultiplexerInstance = muxerEntry.muxerFactory(
securedConn,
true, // isClient = true
);
final PeerScope peerScope = await resourceManager.viewPeer(
securedConn.remotePeer,
(ps) async => ps
);
final core_mux.MuxedConn muxedConnection = await p2pMultiplexerInstance.newConnOnTransport(
securedConn,
false, // isServer = false for outbound
peerScope,
);
final PublicKey localPublicKey;
if (config.peerKey != null) {
localPublicKey = config.peerKey!.publicKey;
} else {
final tempKeyPair = await generateEd25519KeyPair();
localPublicKey = tempKeyPair.publicKey;
}
final PeerId localPId = PeerId.fromPublicKey(localPublicKey);
return UpgradedConnectionImpl(
muxedConn: muxedConnection,
securedConn: securedConn,
negotiatedSecurityProto: chosenSecurityId,
negotiatedMuxerProto: chosenMuxerId,
localPeerId: localPId,
remotePeerId: securedConn.remotePeer,
);
} catch (e) {
await connection.close();
rethrow;
}
}