connect method
Connect the client to a transport
Implementation
Future<void> connect(ClientTransport transport) async {
if (_transport != null) {
throw McpError('Client is already connected to a transport');
}
if (_connecting) {
throw McpError('Client is already connecting to a transport');
}
_connecting = true;
_transport = transport;
_transport!.onMessage.listen(_handleMessage);
_transport!.onClose
.then((_) {
// Only send disconnect event if we're still connected
if (_transport != null) {
_disconnectStreamController.add(DisconnectReason.transportClosed);
_onDisconnect();
}
})
.catchError((error) {
// Only handle error if we're still connected
if (_transport != null) {
_errorStreamController.add(McpError('Transport error: $error'));
_disconnectStreamController.add(DisconnectReason.transportError);
_onDisconnect();
}
});
// Set up message handling
_messageController.stream.listen((message) async {
try {
await _processMessage(message);
} catch (e) {
_logger.debug('Error processing message: $e');
_errorStreamController.add(McpError('Error processing message: $e'));
}
});
// Initialize the connection
try {
await initialize();
_connecting = false;
// Emit connection event after successful initialization
if (_initialized && _serverInfo != null && _serverCapabilities != null) {
_connectStreamController.add(
ServerInfo(
name: _serverInfo!['name'] as String? ?? 'Unknown',
version: _serverInfo!['version'] as String? ?? 'Unknown',
capabilities: _serverCapabilities!.toJson(),
protocolVersion: protocolVersion,
),
);
}
} catch (e) {
_connecting = false;
_errorStreamController.add(McpError('Initialization error: $e'));
disconnect();
rethrow;
}
}