dialPeer method

  1. @override
Future<Conn> dialPeer(
  1. Context context,
  2. PeerId peerId
)
override

Establishes a connection to a given peer

Implementation

@override
Future<Conn> dialPeer(Context context, PeerId peerId) async {
  _logger.warning('Swarm.dialPeer: Entered for peer ${peerId.toString()}. Context: ${context.hashCode}');

  // Debug peer ID information
  _logger.warning('=== SWARM DIAL PEER DEBUG ===');
  _logger.warning('Target peer ID: ${peerId.toString()}');
  _logger.warning('Target peer ID toBase58(): ${peerId.toBase58()}');
  _logger.warning('Target peer ID hashCode: ${peerId.hashCode}');
  _logger.warning('Current connections map keys: ${_connections.keys.toList()}');
  _logger.warning('Total connections in map: ${_connections.length}');
  for (final entry in _connections.entries) {
    _logger.warning('  Connection key: "${entry.key}" -> ${entry.value.length} connections');
    for (final conn in entry.value) {
      _logger.warning('    Conn ${conn.id}: remotePeer=${conn.remotePeer}, remotePeer.toString()="${conn.remotePeer.toString()}", isClosed=${conn.isClosed}');
    }
  }
  _logger.warning('=== END SWARM DIAL PEER DEBUG ===');

  // Check if we're closed
  if (_isClosed) {
    throw Exception('Swarm is closed');
  }

  // Prevent self-dialing
  if (peerId == _localPeer) {
    _logger.fine('Preventing self-dial attempt to ${peerId}');
    throw Exception('Cannot dial self: $peerId');
  }

  // Check if we already have a connection to this peer
  final peerIDStr = peerId.toString();
  _logger.warning('Looking up connections for peer ID string: "$peerIDStr"');
  final existingConns = await _connLock.synchronized(() {
    return _connections[peerIDStr] ?? [];
  });
  _logger.warning('Found ${existingConns.length} existing connections for peer ID string: "$peerIDStr"');

  if (existingConns.isNotEmpty) {
    _logger.warning('Swarm.dialPeer: Found ${existingConns.length} existing connection(s) for peer ${peerId.toString()}. Validating health...');

    // Filter out closed/unhealthy connections
    final healthyConns = <SwarmConn>[];
    final staleConns = <SwarmConn>[];

    for (final conn in existingConns) {
      if (conn.isClosed || !_isConnectionHealthy(conn)) {
        staleConns.add(conn);
        _logger.warning('Swarm.dialPeer: Connection ${conn.id} to peer ${peerId.toString()} is stale/closed');
      } else {
        healthyConns.add(conn);
      }
    }

    // Clean up stale connections
    if (staleConns.isNotEmpty) {
      _logger.warning('Swarm.dialPeer: Cleaning up ${staleConns.length} stale connection(s) for peer ${peerId.toString()}');
      for (final staleConn in staleConns) {
        // Remove from connections map without calling full removeConnection to avoid deadlock
        final conns = _connections[peerIDStr] ?? [];
        conns.remove(staleConn);
        if (conns.isEmpty) {
          _connections.remove(peerIDStr);
        }
        // Schedule cleanup without awaiting to avoid blocking
        Future.microtask(() async {
          try {
            await staleConn.close();
          } catch (e) {
            _logger.warning('Swarm.dialPeer: Error closing stale connection ${staleConn.id}: $e');
          }
        });
      }
    }

    if (healthyConns.isNotEmpty) {
      _logger.warning('Swarm.dialPeer: Found healthy connection for peer ${peerId.toString()}. Returning connection ID: ${healthyConns.first.id}');
      return healthyConns.first;
    } else {
      _logger.warning('Swarm.dialPeer: No healthy connections found for peer ${peerId.toString()}. Will create new connection.');
    }
  }
  _logger.warning('Swarm.dialPeer: No existing connection found for peer ${peerId.toString()}. Attempting new dial.');

  // Get addresses for the peer
  final allAddrs = await _peerstore.addrBook.addrs(peerId);
  if (allAddrs.isEmpty) {
    _logger.warning('Swarm.dialPeer: No addresses found in peerstore for peer: $peerId');
    throw Exception('No addresses found for peer: $peerId');
  }

  // Filter out non-dialable addresses before attempting to connect.
  final dialableAddrs = allAddrs.where((addr) {
    final ip4Val = addr.valueForProtocol('ip4');
    if (ip4Val == '0.0.0.0') {
      return false;
    }
    final ip6Val = addr.valueForProtocol('ip6');
    if (ip6Val == '::') {
      return false;
    }
    return true;
  }).toList();

  if (dialableAddrs.isEmpty) {
    _logger.warning('Swarm.dialPeer: No dialable addresses found for peer: $peerId. Original addrs: $allAddrs');
    throw Exception('No dialable addresses found for peer: $peerId');
  }

  _logger.warning('Swarm.dialPeer: Found dialable addresses for peer $peerId: $dialableAddrs. Trying them...');

  // Try each address until we connect
  Exception? lastError;
  for (final addr in dialableAddrs) {
    try {
      // Find a transport that can dial this address
      Transport? transport;
      for (final t in _transports) {
        if (t.canDial(addr)) {
          transport = t;
          break;
        }
      }

      if (transport == null) {
        _logger.warning('No transport found for address: $addr');
        continue;
      }

      // Dial the address
      final transportConn = await transport.dial(addr);

      _logger.fine("transport.dial return :[${transportConn}]");
      // Upgrade the raw transport connection
      final Conn upgradedConn;
      try {
        upgradedConn = await _upgrader.upgradeOutbound(
          connection: transportConn as TransportConn,
          remotePeerId: peerId, // The target peerId
          config: _config,
          remoteAddr: transportConn.remoteMultiaddr, // The address we dialed on the transport
        );
      } catch (e, s) {
        _logger.warning('Outbound connection upgrade failed for $peerId at $addr: $e\n$s');
        await transportConn.close(); // Close the raw transport connection
        lastError = e is Exception ? e : Exception(e.toString());
        continue; // Try next address
      }

      // Obtain a ConnManagementScope for the new outbound connection
      // The endpoint is the remote multiaddress of the UPGRADED connection.
      final connManagementScope = await _resourceManager.openConnection(
        Direction.outbound,
        true, // usefd
        upgradedConn.remoteMultiaddr,
      );

      // Set peer on scope (remotePeerId should be available from upgradedConn)
      // This should match the 'peerId' we intended to dial.
      await connManagementScope.setPeer(upgradedConn.remotePeer);

      // Create a swarm connection with the UPGRADED connection
      final connID = _nextConnID++;
      final swarmConn = SwarmConn(
        id: connID.toString(),
        conn: upgradedConn, // Use the UPGRADED connection
        localPeer: _localPeer, // Swarm's local peer
        remotePeer: upgradedConn.remotePeer, // PeerId from upgraded connection
        direction: Direction.outbound,
        swarm: this,
        managementScope: connManagementScope,
      );

      // Add the connection to our map
      _logger.warning('=== STORING OUTBOUND CONNECTION ===');
      _logger.warning('Storing connection for peer: ${upgradedConn.remotePeer}');
      _logger.warning('Peer ID toString(): "$peerIDStr"');
      _logger.warning('Peer ID toBase58(): ${upgradedConn.remotePeer.toBase58()}');
      _logger.warning('Connection ID: ${swarmConn.id}');
      _logger.warning('Target peer we intended to dial: $peerId');
      _logger.warning('Target peer toString(): "${peerId.toString()}"');
      _logger.warning('Target peer toBase58(): ${peerId.toBase58()}');
      _logger.warning('Are they equal? ${upgradedConn.remotePeer == peerId}');
      _logger.warning('=== END STORING OUTBOUND CONNECTION ===');

      await _connLock.synchronized(() {
        if (!_connections.containsKey(peerIDStr)) {
          _connections[peerIDStr] = [];
        }
        _connections[peerIDStr]!.add(swarmConn);
        _logger.warning('Outbound connection stored. Total connections for "$peerIDStr": ${_connections[peerIDStr]!.length}');
      });

      // Notify connection opened
      await _notifieeLock.synchronized(() async {
        for (final notifiee in _notifiees) {
          notifiee.connected(this, swarmConn);
        }
      });

      // Handle incoming streams
      _handleIncomingStreams(swarmConn);

      return swarmConn;
    } catch (e, s) {
      lastError = e is Exception ? e : Exception(e.toString());
      _logger.warning('Error dialing $addr: $e\n$s');
    }
  }

  throw lastError ?? Exception('Failed to dial peer: $peerId');
}