upgradeInbound method
Future<Conn>
upgradeInbound({
- required TransportConn connection,
- required Config config,
override
Upgrades an inbound connection.
This method orchestrates the entire upgrade process for an incoming connection:
- Negotiates and applies a security protocol.
- Negotiates and applies a stream multiplexer over the secured connection.
Returns a fully upgraded Conn ready for use by the swarm.
Parameters:
connection: The raw TransportConn to upgrade.config: The node's Config containing security and muxer options.
Implementation
@override
Future<Conn> upgradeInbound({
required TransportConn connection,
required Config config,
}) async {
try {
// Extract or generate connection context
ConnectionContext? connContext;
if (connection is RelayedConn) {
// For relay connections, use the existing context from RelayedConn
connContext = connection.context;
} else {
// For direct connections, generate a new context
// Note: We don't know the remote peer yet, will be set after security negotiation
connContext = null; // Will be created after we know the remote peer
}
final mssForSecurity = MultistreamMuxer();
final negotiationSecStream =
NegotiationStreamWrapper(connection, '/sec-negotiator-in');
final Completer<ProtocolID> securityProtoCompleter = Completer();
if (config.securityProtocols.isEmpty) {
await connection.close();
throw Exception(
"No security protocols configured for inbound connection");
}
for (final sp in config.securityProtocols) {
mssForSecurity.addHandler(sp.protocolId,
(ProtocolID p, P2PStream s) async {
if (!securityProtoCompleter.isCompleted) {
securityProtoCompleter.complete(p);
}
});
}
await mssForSecurity.handle(negotiationSecStream);
final chosenSecurityId = await securityProtoCompleter.future;
final securityModule = config.securityProtocols.firstWhere(
(s) => s.protocolId == chosenSecurityId,
orElse: () => throw Exception(
"Client proposed security protocol $chosenSecurityId not found/supported"),
);
final SecuredConnection securedConn =
await securityModule.secureInbound(connection);
final mssForMuxers = MultistreamMuxer();
final negotiationMuxStream =
NegotiationStreamWrapper(securedConn, '/mux-negotiator-in');
final Completer<ProtocolID> muxerProtoCompleter = Completer();
if (config.muxers.isEmpty) {
await securedConn.close();
throw Exception("No muxers configured for inbound connection");
}
for (final m in config.muxers) {
mssForMuxers.addHandler(m.id, (ProtocolID p, P2PStream s) async {
if (!muxerProtoCompleter.isCompleted) {
muxerProtoCompleter.complete(p);
}
});
}
await mssForMuxers.handle(negotiationMuxStream);
final chosenMuxerId = await muxerProtoCompleter.future;
final muxerEntry = config.muxers.firstWhere(
(m) => m.id == chosenMuxerId,
orElse: () => throw Exception(
"Client proposed muxer $chosenMuxerId not found/supported"),
);
final p2p_mux.Multiplexer p2pMultiplexerInstance =
muxerEntry.muxerFactory(
securedConn,
false, // isClient = false
);
final PeerScope peerScope = await resourceManager.viewPeer(
securedConn.remotePeer, (ps) async => ps);
final core_mux.MuxedConn muxedConnection =
await p2pMultiplexerInstance.newConnOnTransport(
securedConn,
true, // isServer = true for inbound
peerScope,
);
final PublicKey localPublicKey;
if (config.peerKey != null) {
localPublicKey = config.peerKey!.publicKey;
} else {
final tempKeyPair = await generateEd25519KeyPair();
localPublicKey = tempKeyPair.publicKey;
}
final PeerId localPId = PeerId.fromPublicKey(localPublicKey);
// Generate context for direct inbound connections if not already set
if (connContext == null && connection is! RelayedConn) {
connContext = ConnectionContext.direct(
remotePeerId: securedConn.remotePeer.toBase58(),
transportConnectionId: connection.id,
);
}
return UpgradedConnectionImpl(
muxedConn: muxedConnection,
securedConn: securedConn,
negotiatedSecurityProto: chosenSecurityId,
negotiatedMuxerProto: chosenMuxerId,
localPeerId: localPId,
remotePeerId: securedConn.remotePeer,
context: connContext,
);
} catch (e) {
await connection.close();
rethrow;
}
}