upgradeInbound method
Future<Conn>
upgradeInbound({
- required TransportConn connection,
- required Config config,
override
Upgrades an inbound connection.
This method orchestrates the entire upgrade process for an incoming connection:
- Negotiates and applies a security protocol.
- Negotiates and applies a stream multiplexer over the secured connection.
Returns a fully upgraded Conn ready for use by the swarm.
Parameters:
connection
: The raw TransportConn to upgrade.config
: The node's Config containing security and muxer options.
Implementation
@override
Future<Conn> upgradeInbound({
required TransportConn connection,
required Config config,
}) async {
try {
final mssForSecurity = MultistreamMuxer();
final negotiationSecStream = NegotiationStreamWrapper(connection, '/sec-negotiator-in');
final Completer<ProtocolID> securityProtoCompleter = Completer();
if (config.securityProtocols.isEmpty) {
await connection.close();
throw Exception("No security protocols configured for inbound connection");
}
for (final sp in config.securityProtocols) {
mssForSecurity.addHandler(sp.protocolId, (ProtocolID p, P2PStream s) async {
if (!securityProtoCompleter.isCompleted) {
securityProtoCompleter.complete(p);
}
});
}
await mssForSecurity.handle(negotiationSecStream);
final chosenSecurityId = await securityProtoCompleter.future;
final securityModule = config.securityProtocols.firstWhere(
(s) => s.protocolId == chosenSecurityId,
orElse: () => throw Exception("Client proposed security protocol $chosenSecurityId not found/supported"),
);
final SecuredConnection securedConn = await securityModule.secureInbound(connection);
final mssForMuxers = MultistreamMuxer();
final negotiationMuxStream = NegotiationStreamWrapper(securedConn, '/mux-negotiator-in');
final Completer<ProtocolID> muxerProtoCompleter = Completer();
if (config.muxers.isEmpty) {
await securedConn.close();
throw Exception("No muxers configured for inbound connection");
}
for (final m in config.muxers) {
mssForMuxers.addHandler(m.id, (ProtocolID p, P2PStream s) async {
if (!muxerProtoCompleter.isCompleted) {
muxerProtoCompleter.complete(p);
}
});
}
await mssForMuxers.handle(negotiationMuxStream);
final chosenMuxerId = await muxerProtoCompleter.future;
final muxerEntry = config.muxers.firstWhere(
(m) => m.id == chosenMuxerId,
orElse: () => throw Exception("Client proposed muxer $chosenMuxerId not found/supported"),
);
final p2p_mux.Multiplexer p2pMultiplexerInstance = muxerEntry.muxerFactory(
securedConn,
false, // isClient = false
);
final PeerScope peerScope = await resourceManager.viewPeer(
securedConn.remotePeer,
(ps) async => ps
);
final core_mux.MuxedConn muxedConnection = await p2pMultiplexerInstance.newConnOnTransport(
securedConn,
true, // isServer = true for inbound
peerScope,
);
final PublicKey localPublicKey;
if (config.peerKey != null) {
localPublicKey = config.peerKey!.publicKey;
} else {
final tempKeyPair = await generateEd25519KeyPair();
localPublicKey = tempKeyPair.publicKey;
}
final PeerId localPId = PeerId.fromPublicKey(localPublicKey);
return UpgradedConnectionImpl(
muxedConn: muxedConnection,
securedConn: securedConn,
negotiatedSecurityProto: chosenSecurityId,
negotiatedMuxerProto: chosenMuxerId,
localPeerId: localPId,
remotePeerId: securedConn.remotePeer,
);
} catch (e) {
await connection.close();
rethrow;
}
}