connect method

Future<void> connect(
  1. String url, {
  2. Map<String, String>? headers,
  3. dynamic onOpen(
    1. String?
    )?,
  4. dynamic onMessage(
    1. dynamic
    )?,
  5. dynamic onError(
    1. dynamic
    )?,
  6. dynamic onHeartbeat(
    1. Duration
    )?,
})

Implementation

Future<void> connect(
  String url, {
  Map<String, String>? headers,
  Function(String?)? onOpen,
  Function(dynamic)? onMessage,
  Function(dynamic)? onError,
  Function(Duration)? onHeartbeat,
}) async {
  _logger.debug('HeartbeatEventSource connecting to: $url');
  if (_isConnected) {
    throw McpError('HeartbeatEventSource is already connected');
  }

  try {
    _client = HttpClient();
    _request = await _client!.getUrl(Uri.parse(url));

    // Set heartbeat and SSE headers
    if (headers != null) {
      headers.forEach((key, value) {
        _request!.headers.set(key, value);
      });
    }

    _response = await _request!.close();

    if (_response!.statusCode != 200) {
      final body = await _response!.transform(utf8.decoder).join();
      throw McpError(
        'Failed to connect to heartbeat SSE endpoint: ${_response!.statusCode} - $body',
      );
    }

    _isConnected = true;
    _logger.debug('Heartbeat EventSource connection established');

    // Set up subscription to process heartbeat events
    _subscription = _response!.listen(
      (List<int> data) {
        try {
          final chunk = utf8.decode(data, allowMalformed: true);
          _logger.debug('Raw heartbeat SSE data: [$chunk]');
          _buffer.write(chunk);

          // Process all events in buffer
          final content = _buffer.toString();

          // Check for heartbeat responses
          if (content.contains('event: heartbeat') ||
              content.contains('event:heartbeat')) {
            _logger.debug('Detected heartbeat event in SSE stream');

            final event = _processBuffer();
            if (event.event == 'heartbeat' && onHeartbeat != null) {
              final latency = _calculateLatency(event.data);
              onHeartbeat(latency);
              return;
            }
          }

          // Check for JSON-RPC responses
          if (content.contains('"jsonrpc":"2.0"') ||
              content.contains('"jsonrpc": "2.0"')) {
            _logger.debug('Detected JSON-RPC data in heartbeat SSE stream');

            try {
              final jsonStart = content.indexOf('{');
              final jsonEnd = content.lastIndexOf('}') + 1;

              if (jsonStart >= 0 && jsonEnd > jsonStart) {
                final jsonStr = content.substring(jsonStart, jsonEnd);
                _logger.debug(
                  'Extracted heartbeat JSON: ${jsonStr.substring(0, 100)}...',
                );

                try {
                  final jsonData = jsonDecode(jsonStr);
                  _logger.debug('Parsed heartbeat JSON-RPC data');

                  // Clear processed data from buffer
                  if (jsonEnd < content.length) {
                    _buffer.clear();
                    _buffer.write(content.substring(jsonEnd));
                  } else {
                    _buffer.clear();
                  }

                  // Forward to message handler
                  if (onMessage != null) {
                    onMessage(jsonData);
                  }
                  return;
                } catch (e) {
                  _logger.debug('JSON parse error in heartbeat stream: $e');
                }
              }
            } catch (e) {
              _logger.debug(
                'Error extracting JSON from heartbeat stream: $e',
              );
            }
          }

          // Process regular SSE events
          final event = _processBuffer();
          _logger.debug(
            'Processed heartbeat SSE event: ${event.event}, data: ${event.data}',
          );

          if (event.event == 'endpoint' && event.data != null) {
            _logger.debug('Received heartbeat endpoint event: ${event.data}');
            if (onOpen != null) {
              onOpen(event.data);
            }
          } else if (event.data != null && onMessage != null) {
            onMessage(event.data);
          }
        } catch (e) {
          _logger.debug('Error processing heartbeat SSE data: $e');
        }
      },
      onError: (e) {
        _logger.debug('Heartbeat EventSource error: $e');
        _isConnected = false;
        if (onError != null) {
          onError(e);
        }
      },
      onDone: () {
        _logger.debug('Heartbeat EventSource stream closed');
        _isConnected = false;
        if (onError != null) {
          onError('Heartbeat connection closed');
        }
      },
    );
  } catch (e) {
    _logger.debug('Heartbeat EventSource connection error: $e');
    _isConnected = false;
    if (onError != null) {
      onError(e);
    }
    rethrow;
  }
}