connect method

Future<void> connect(
  1. ClientTransport transport
)

Connect the client to a transport

Implementation

Future<void> connect(ClientTransport transport) async {
  if (_transport != null) {
    throw McpError('Client is already connected to a transport');
  }

  if (_connecting) {
    throw McpError('Client is already connecting to a transport');
  }

  _connecting = true;
  _transport = transport;
  _transport!.onMessage.listen(_handleMessage);
  _transport!.onClose
      .then((_) {
        // Only send disconnect event if we're still connected
        if (_transport != null) {
          _disconnectStreamController.add(DisconnectReason.transportClosed);
          _onDisconnect();
        }
      })
      .catchError((error) {
        // Only handle error if we're still connected
        if (_transport != null) {
          _errorStreamController.add(McpError('Transport error: $error'));
          _disconnectStreamController.add(DisconnectReason.transportError);
          _onDisconnect();
        }
      });

  // Set up message handling
  _messageController.stream.listen((message) async {
    try {
      await _processMessage(message);
    } catch (e) {
      _logger.debug('Error processing message: $e');
      _errorStreamController.add(McpError('Error processing message: $e'));
    }
  });

  // Initialize the connection
  try {
    await initialize();
    _connecting = false;

    // Emit connection event after successful initialization
    if (_initialized && _serverInfo != null && _serverCapabilities != null) {
      _connectStreamController.add(
        ServerInfo(
          name: _serverInfo!['name'] as String? ?? 'Unknown',
          version: _serverInfo!['version'] as String? ?? 'Unknown',
          capabilities: _serverCapabilities!.toJson(),
          protocolVersion: protocolVersion,
        ),
      );
    }
  } catch (e) {
    _connecting = false;
    _errorStreamController.add(McpError('Initialization error: $e'));
    disconnect();
    rethrow;
  }
}