dialPeer method
Establishes a connection to a given peer
Implementation
@override
Future<Conn> dialPeer(Context context, PeerId peerId) async {
_logger.warning('Swarm.dialPeer: Entered for peer ${peerId.toString()}. Context: ${context.hashCode}');
// Debug peer ID information
_logger.warning('=== SWARM DIAL PEER DEBUG ===');
_logger.warning('Target peer ID: ${peerId.toString()}');
_logger.warning('Target peer ID toBase58(): ${peerId.toBase58()}');
_logger.warning('Target peer ID hashCode: ${peerId.hashCode}');
_logger.warning('Current connections map keys: ${_connections.keys.toList()}');
_logger.warning('Total connections in map: ${_connections.length}');
for (final entry in _connections.entries) {
_logger.warning(' Connection key: "${entry.key}" -> ${entry.value.length} connections');
for (final conn in entry.value) {
_logger.warning(' Conn ${conn.id}: remotePeer=${conn.remotePeer}, remotePeer.toString()="${conn.remotePeer.toString()}", isClosed=${conn.isClosed}');
}
}
_logger.warning('=== END SWARM DIAL PEER DEBUG ===');
// Check if we're closed
if (_isClosed) {
throw Exception('Swarm is closed');
}
// Prevent self-dialing
if (peerId == _localPeer) {
_logger.fine('Preventing self-dial attempt to ${peerId}');
throw Exception('Cannot dial self: $peerId');
}
// Check if we already have a connection to this peer
final peerIDStr = peerId.toString();
_logger.warning('Looking up connections for peer ID string: "$peerIDStr"');
final existingConns = await _connLock.synchronized(() {
return _connections[peerIDStr] ?? [];
});
_logger.warning('Found ${existingConns.length} existing connections for peer ID string: "$peerIDStr"');
if (existingConns.isNotEmpty) {
_logger.warning('Swarm.dialPeer: Found ${existingConns.length} existing connection(s) for peer ${peerId.toString()}. Validating health...');
// Filter out closed/unhealthy connections
final healthyConns = <SwarmConn>[];
final staleConns = <SwarmConn>[];
for (final conn in existingConns) {
if (conn.isClosed || !_isConnectionHealthy(conn)) {
staleConns.add(conn);
_logger.warning('Swarm.dialPeer: Connection ${conn.id} to peer ${peerId.toString()} is stale/closed');
} else {
healthyConns.add(conn);
}
}
// Clean up stale connections
if (staleConns.isNotEmpty) {
_logger.warning('Swarm.dialPeer: Cleaning up ${staleConns.length} stale connection(s) for peer ${peerId.toString()}');
for (final staleConn in staleConns) {
// Remove from connections map without calling full removeConnection to avoid deadlock
final conns = _connections[peerIDStr] ?? [];
conns.remove(staleConn);
if (conns.isEmpty) {
_connections.remove(peerIDStr);
}
// Schedule cleanup without awaiting to avoid blocking
Future.microtask(() async {
try {
await staleConn.close();
} catch (e) {
_logger.warning('Swarm.dialPeer: Error closing stale connection ${staleConn.id}: $e');
}
});
}
}
if (healthyConns.isNotEmpty) {
_logger.warning('Swarm.dialPeer: Found healthy connection for peer ${peerId.toString()}. Returning connection ID: ${healthyConns.first.id}');
return healthyConns.first;
} else {
_logger.warning('Swarm.dialPeer: No healthy connections found for peer ${peerId.toString()}. Will create new connection.');
}
}
_logger.warning('Swarm.dialPeer: No existing connection found for peer ${peerId.toString()}. Attempting new dial.');
// Get addresses for the peer
final allAddrs = await _peerstore.addrBook.addrs(peerId);
if (allAddrs.isEmpty) {
_logger.warning('Swarm.dialPeer: No addresses found in peerstore for peer: $peerId');
throw Exception('No addresses found for peer: $peerId');
}
// Filter out non-dialable addresses before attempting to connect.
final dialableAddrs = allAddrs.where((addr) {
final ip4Val = addr.valueForProtocol('ip4');
if (ip4Val == '0.0.0.0') {
return false;
}
final ip6Val = addr.valueForProtocol('ip6');
if (ip6Val == '::') {
return false;
}
return true;
}).toList();
if (dialableAddrs.isEmpty) {
_logger.warning('Swarm.dialPeer: No dialable addresses found for peer: $peerId. Original addrs: $allAddrs');
throw Exception('No dialable addresses found for peer: $peerId');
}
_logger.warning('Swarm.dialPeer: Found dialable addresses for peer $peerId: $dialableAddrs. Trying them...');
// Try each address until we connect
Exception? lastError;
for (final addr in dialableAddrs) {
try {
// Find a transport that can dial this address
Transport? transport;
for (final t in _transports) {
if (t.canDial(addr)) {
transport = t;
break;
}
}
if (transport == null) {
_logger.warning('No transport found for address: $addr');
continue;
}
// Dial the address
final transportConn = await transport.dial(addr);
_logger.fine("transport.dial return :[${transportConn}]");
// Upgrade the raw transport connection
final Conn upgradedConn;
try {
upgradedConn = await _upgrader.upgradeOutbound(
connection: transportConn as TransportConn,
remotePeerId: peerId, // The target peerId
config: _config,
remoteAddr: transportConn.remoteMultiaddr, // The address we dialed on the transport
);
} catch (e, s) {
_logger.warning('Outbound connection upgrade failed for $peerId at $addr: $e\n$s');
await transportConn.close(); // Close the raw transport connection
lastError = e is Exception ? e : Exception(e.toString());
continue; // Try next address
}
// Obtain a ConnManagementScope for the new outbound connection
// The endpoint is the remote multiaddress of the UPGRADED connection.
final connManagementScope = await _resourceManager.openConnection(
Direction.outbound,
true, // usefd
upgradedConn.remoteMultiaddr,
);
// Set peer on scope (remotePeerId should be available from upgradedConn)
// This should match the 'peerId' we intended to dial.
await connManagementScope.setPeer(upgradedConn.remotePeer);
// Create a swarm connection with the UPGRADED connection
final connID = _nextConnID++;
final swarmConn = SwarmConn(
id: connID.toString(),
conn: upgradedConn, // Use the UPGRADED connection
localPeer: _localPeer, // Swarm's local peer
remotePeer: upgradedConn.remotePeer, // PeerId from upgraded connection
direction: Direction.outbound,
swarm: this,
managementScope: connManagementScope,
);
// Add the connection to our map
_logger.warning('=== STORING OUTBOUND CONNECTION ===');
_logger.warning('Storing connection for peer: ${upgradedConn.remotePeer}');
_logger.warning('Peer ID toString(): "$peerIDStr"');
_logger.warning('Peer ID toBase58(): ${upgradedConn.remotePeer.toBase58()}');
_logger.warning('Connection ID: ${swarmConn.id}');
_logger.warning('Target peer we intended to dial: $peerId');
_logger.warning('Target peer toString(): "${peerId.toString()}"');
_logger.warning('Target peer toBase58(): ${peerId.toBase58()}');
_logger.warning('Are they equal? ${upgradedConn.remotePeer == peerId}');
_logger.warning('=== END STORING OUTBOUND CONNECTION ===');
await _connLock.synchronized(() {
if (!_connections.containsKey(peerIDStr)) {
_connections[peerIDStr] = [];
}
_connections[peerIDStr]!.add(swarmConn);
_logger.warning('Outbound connection stored. Total connections for "$peerIDStr": ${_connections[peerIDStr]!.length}');
});
// Notify connection opened
await _notifieeLock.synchronized(() async {
for (final notifiee in _notifiees) {
notifiee.connected(this, swarmConn);
}
});
// Handle incoming streams
_handleIncomingStreams(swarmConn);
return swarmConn;
} catch (e, s) {
lastError = e is Exception ? e : Exception(e.toString());
_logger.warning('Error dialing $addr: $e\n$s');
}
}
throw lastError ?? Exception('Failed to dial peer: $peerId');
}