connect method

  1. @override
Future<void> connect(
  1. String url, {
  2. Map<String, String>? headers,
  3. dynamic onOpen(
    1. String?
    )?,
  4. dynamic onMessage(
    1. dynamic
    )?,
  5. dynamic onError(
    1. dynamic
    )?,
  6. dynamic onEndpoint(
    1. String?
    )?,
  7. dynamic onAuthFailure(
    1. int,
    2. String
    )?,
})

Connect to an SSE endpoint

Implementation

@override
Future<void> connect(
  String url, {
  Map<String, String>? headers,
  Function(String?)? onOpen,
  Function(dynamic)? onMessage,
  Function(dynamic)? onError,
  Function(String?)? onEndpoint,
  Function(int, String)? onAuthFailure,
}) async {
  _logger.debug('AuthenticatedEventSource connecting to: $url');
  if (_isConnected) {
    throw McpError('AuthenticatedEventSource is already connected');
  }

  try {
    _client = HttpClient();
    _request = await _client!.getUrl(Uri.parse(url));

    // Set authentication and SSE headers
    if (headers != null) {
      headers.forEach((key, value) {
        _request!.headers.set(key, value);
      });
    }

    _response = await _request!.close();

    // Handle authentication failures
    if (_response!.statusCode == 401 || _response!.statusCode == 403) {
      final body = await _response!.transform(utf8.decoder).join();
      _logger.debug(
        'Authentication failed: ${_response!.statusCode} - $body',
      );

      if (onAuthFailure != null) {
        onAuthFailure(_response!.statusCode, body);
      }

      throw McpError('Authentication failed: ${_response!.statusCode}');
    }

    if (_response!.statusCode != 200) {
      final body = await _response!.transform(utf8.decoder).join();
      throw McpError(
        'Failed to connect to authenticated SSE endpoint: ${_response!.statusCode} - $body',
      );
    }

    _isConnected = true;
    _logger.debug('Authenticated EventSource connection established');

    // Set up subscription to process events
    _subscription = _response!.listen(
      (List<int> data) {
        try {
          final chunk = utf8.decode(data, allowMalformed: true);
          _logger.debug('Raw authenticated SSE data: [$chunk]');
          _buffer.write(chunk);

          // Process all events in buffer
          final content = _buffer.toString();

          // Check for JSON-RPC responses
          if (content.contains('"jsonrpc":"2.0"') ||
              content.contains('"jsonrpc": "2.0"')) {
            _logger.debug(
              'Detected JSON-RPC data in authenticated SSE stream',
            );

            try {
              final jsonStart = content.indexOf('{');
              final jsonEnd = content.lastIndexOf('}') + 1;

              if (jsonStart >= 0 && jsonEnd > jsonStart) {
                final jsonStr = content.substring(jsonStart, jsonEnd);
                _logger.debug('Extracted authenticated JSON: $jsonStr');

                try {
                  final jsonData = jsonDecode(jsonStr);
                  _logger.debug(
                    'Parsed authenticated JSON-RPC data: $jsonData',
                  );

                  // Clear processed data from buffer
                  if (jsonEnd < content.length) {
                    _buffer.clear();
                    _buffer.write(content.substring(jsonEnd));
                  } else {
                    _buffer.clear();
                  }

                  // Forward to message handler
                  if (onMessage != null) {
                    onMessage(jsonData);
                  }
                  return;
                } catch (e) {
                  _logger.debug(
                    'JSON parse error in authenticated stream: $e',
                  );
                }
              }
            } catch (e) {
              _logger.debug(
                'Error extracting JSON from authenticated stream: $e',
              );
            }
          }

          // Process SSE events
          final event = _processBuffer();
          _logger.debug(
            'Processed authenticated SSE event: ${event.event}, data: ${event.data}',
          );

          if (event.event == 'endpoint' && event.data != null) {
            _logger.debug(
              'Received authenticated endpoint event: ${event.data}',
            );
            if (onOpen != null) {
              onOpen(event.data);
            }
          } else if (event.data != null && onMessage != null) {
            onMessage(event.data);
          }
        } catch (e) {
          _logger.debug('Error processing authenticated SSE data: $e');
        }
      },
      onError: (e) {
        _logger.debug('Authenticated EventSource error: $e');
        _isConnected = false;
        if (onError != null) {
          onError(e);
        }
      },
      onDone: () {
        _logger.debug('Authenticated EventSource stream closed');
        _isConnected = false;
        if (onError != null) {
          onError('Authenticated connection closed');
        }
      },
    );
  } catch (e) {
    _logger.debug('Authenticated EventSource connection error: $e');
    _isConnected = false;
    if (onError != null) {
      onError(e);
    }
    rethrow;
  }
}