connect method
Connect the server to a transport
Implementation
void connect(ServerTransport transport) {
if (_transport != null) {
_logger.debug('Server already has a transport connected');
// Handle STDIO transport specifically
if (transport is StdioServerTransport && _transport is StdioServerTransport) {
_logger.debug('Attempting to connect another STDIO transport - reusing existing connection');
// Use existing transport instead of the new one
transport = _transport as StdioServerTransport;
} else {
throw McpError('Server is already connected to a transport');
}
}
_transport = transport;
// Determine if this is a multi-session transport (StreamableHTTP)
final isMultiSessionTransport = transport is StreamableHttpServerTransport;
// Wire RFC 9728 metadata into the streamable HTTP transport so the
// spec route `/.well-known/oauth-protected-resource` resolves to the
// current snapshot of `protectedResourceMetadata`. Using a callback
// (rather than a one-shot copy) so a later
// `configureProtectedResource(...)` is reflected without reconnect.
if (isMultiSessionTransport) {
transport.setProtectedResourceMetadataProvider(
() => protectedResourceMetadata,
);
}
// Create initial session only for single-session transports (for backward compatibility)
String? initialSessionId;
if (!isMultiSessionTransport) {
initialSessionId = _createSession(transport);
}
try {
transport.onMessage.listen((rawMessage) {
// Extract session ID from message metadata (if present)
String? messageSessionId;
if (rawMessage is Map && rawMessage.containsKey('_sessionId')) {
// Multi-session transport: use session ID from message
messageSessionId = rawMessage['_sessionId'] as String;
// Create session if it doesn't exist
if (!_sessions.containsKey(messageSessionId)) {
_logger.debug('Creating new session from transport message: $messageSessionId');
final session = ClientSession(
id: messageSessionId,
connectedAt: DateTime.now(),
transport: transport, // IMPORTANT: Set transport reference for sending responses
);
session.capabilities = {};
_sessions[messageSessionId] = session;
// Update metrics
_metricsCollector.gauge('mcp_connections_active').set(_sessions.length.toDouble());
_metricsCollector.counter('mcp_connections_total').increment();
// Emit the connect event so consumers can track sessions —
// the single-session path goes through `_createSession`
// which fires this; the multi-session inline path used by
// streamable HTTP / SSE was previously skipping it,
// leaving `server.onConnect` silent for those transports.
if (!_connectStreamController.isClosed) {
_connectStreamController.add(session);
}
}
} else {
// Single-session transport: use initial session ID
messageSessionId = initialSessionId;
}
if (messageSessionId == null) {
_logger.error('No session ID available for message: $rawMessage');
return;
}
_handleMessage(messageSessionId, rawMessage);
}, onError: (error) {
_logger.error('Error from transport message stream: $error');
});
transport.onClose.then((_) {
// Remove all sessions associated with this transport
final sessionIds = List<String>.from(_sessions.keys);
for (final sid in sessionIds) {
_removeSession(sid);
}
_onDisconnect();
}).catchError((error) {
_logger.error('Transport close error: $error');
// Remove all sessions associated with this transport
final sessionIds = List<String>.from(_sessions.keys);
for (final sid in sessionIds) {
_removeSession(sid);
}
_onDisconnect();
});
} catch (e) {
_logger.error('Failed to setup transport listeners: $e');
}
// Set up message processing with better error handling
_messageController.stream.listen((message) async {
try {
await _processMessage(message.sessionId, message);
} catch (e, stackTrace) {
_logger.error('Error processing message: $e');
_logger.debug('Stack trace: $stackTrace');
try {
_sendErrorResponse(message.sessionId, message.id, ErrorCode.internalError, 'Internal error: $e');
} catch (sendError) {
_logger.error('Failed to send error response: $sendError');
}
}
}, onError: (error) {
_logger.error('Error in message stream: $error');
});
}