connect method

void connect(
  1. ServerTransport transport
)

Connect the server to a transport

Implementation

void connect(ServerTransport transport) {
  if (_transport != null) {
    _logger.debug('Server already has a transport connected');

    // Handle STDIO transport specifically
    if (transport is StdioServerTransport && _transport is StdioServerTransport) {
      _logger.debug('Attempting to connect another STDIO transport - reusing existing connection');
      // Use existing transport instead of the new one
      transport = _transport as StdioServerTransport;
    } else {
      throw McpError('Server is already connected to a transport');
    }
  }

  _transport = transport;

  // Determine if this is a multi-session transport (StreamableHTTP)
  final isMultiSessionTransport = transport is StreamableHttpServerTransport;

  // Wire RFC 9728 metadata into the streamable HTTP transport so the
  // spec route `/.well-known/oauth-protected-resource` resolves to the
  // current snapshot of `protectedResourceMetadata`. Using a callback
  // (rather than a one-shot copy) so a later
  // `configureProtectedResource(...)` is reflected without reconnect.
  if (isMultiSessionTransport) {
    transport.setProtectedResourceMetadataProvider(
      () => protectedResourceMetadata,
    );
  }

  // Create initial session only for single-session transports (for backward compatibility)
  String? initialSessionId;
  if (!isMultiSessionTransport) {
    initialSessionId = _createSession(transport);
  }

  try {
    transport.onMessage.listen((rawMessage) {
      // Extract session ID from message metadata (if present)
      String? messageSessionId;

      if (rawMessage is Map && rawMessage.containsKey('_sessionId')) {
        // Multi-session transport: use session ID from message
        messageSessionId = rawMessage['_sessionId'] as String;

        // Create session if it doesn't exist
        if (!_sessions.containsKey(messageSessionId)) {
          _logger.debug('Creating new session from transport message: $messageSessionId');
          final session = ClientSession(
            id: messageSessionId,
            connectedAt: DateTime.now(),
            transport: transport,  // IMPORTANT: Set transport reference for sending responses
          );
          session.capabilities = {};
          _sessions[messageSessionId] = session;

          // Update metrics
          _metricsCollector.gauge('mcp_connections_active').set(_sessions.length.toDouble());
          _metricsCollector.counter('mcp_connections_total').increment();

          // Emit the connect event so consumers can track sessions —
          // the single-session path goes through `_createSession`
          // which fires this; the multi-session inline path used by
          // streamable HTTP / SSE was previously skipping it,
          // leaving `server.onConnect` silent for those transports.
          if (!_connectStreamController.isClosed) {
            _connectStreamController.add(session);
          }
        }
      } else {
        // Single-session transport: use initial session ID
        messageSessionId = initialSessionId;
      }

      if (messageSessionId == null) {
        _logger.error('No session ID available for message: $rawMessage');
        return;
      }

      _handleMessage(messageSessionId, rawMessage);
    }, onError: (error) {
      _logger.error('Error from transport message stream: $error');
    });

    transport.onClose.then((_) {
      // Remove all sessions associated with this transport
      final sessionIds = List<String>.from(_sessions.keys);
      for (final sid in sessionIds) {
        _removeSession(sid);
      }
      _onDisconnect();
    }).catchError((error) {
      _logger.error('Transport close error: $error');
      // Remove all sessions associated with this transport
      final sessionIds = List<String>.from(_sessions.keys);
      for (final sid in sessionIds) {
        _removeSession(sid);
      }
      _onDisconnect();
    });
  } catch (e) {
    _logger.error('Failed to setup transport listeners: $e');
  }

  // Set up message processing with better error handling
  _messageController.stream.listen((message) async {
    try {
      await _processMessage(message.sessionId, message);
    } catch (e, stackTrace) {
      _logger.error('Error processing message: $e');
      _logger.debug('Stack trace: $stackTrace');

      try {
        _sendErrorResponse(message.sessionId, message.id, ErrorCode.internalError, 'Internal error: $e');
      } catch (sendError) {
        _logger.error('Failed to send error response: $sendError');
      }
    }
  }, onError: (error) {
    _logger.error('Error in message stream: $error');
  });
}