connect method

Future<void> connect(
  1. String url, {
  2. Map<String, String>? headers,
  3. int compressionThreshold = 1024,
  4. dynamic onOpen(
    1. String?
    )?,
  5. dynamic onMessage(
    1. dynamic
    )?,
  6. dynamic onError(
    1. dynamic
    )?,
  7. dynamic onCompressionNegotiated(
    1. CompressionType
    )?,
})

Implementation

Future<void> connect(
  String url, {
  Map<String, String>? headers,
  int compressionThreshold = 1024,
  Function(String?)? onOpen,
  Function(dynamic)? onMessage,
  Function(dynamic)? onError,
  Function(CompressionType)? onCompressionNegotiated,
}) async {
  _logger.debug('CompressedEventSource connecting to: $url');
  if (_isConnected) {
    throw McpError('CompressedEventSource is already connected');
  }

  try {
    _client = HttpClient();
    _request = await _client!.getUrl(Uri.parse(url));

    // Set compression and SSE headers
    if (headers != null) {
      headers.forEach((key, value) {
        _request!.headers.set(key, value);
      });
    }

    _response = await _request!.close();

    if (_response!.statusCode != 200) {
      final body = await _response!.transform(utf8.decoder).join();
      throw McpError(
        'Failed to connect to compressed SSE endpoint: ${_response!.statusCode} - $body',
      );
    }

    // Check negotiated compression
    final contentEncoding = _response!.headers.value('content-encoding');
    _activeCompression = _parseCompressionType(contentEncoding);

    if (onCompressionNegotiated != null) {
      onCompressionNegotiated(_activeCompression);
    }

    _isConnected = true;
    _logger.debug(
      'Compressed EventSource connection established with encoding: ${_activeCompression.encoding}',
    );

    // Set up subscription to process compressed events
    _subscription = _response!.listen(
      (List<int> data) {
        try {
          // Decompress data if needed
          List<int> decompressedData;
          switch (_activeCompression) {
            case CompressionType.gzip:
              decompressedData = gzip.decode(data);
              break;
            case CompressionType.deflate:
              decompressedData = zlib.decode(data);
              break;
            case CompressionType.brotli:
              // Brotli decompression would go here
              _logger.debug('Brotli decompression not implemented');
              decompressedData = data;
              break;
            case CompressionType.none:
              decompressedData = data;
              break;
          }

          final chunk = utf8.decode(decompressedData, allowMalformed: true);
          _logger.debug('Raw decompressed SSE data: [$chunk]');
          _buffer.write(chunk);

          // Process all events in buffer
          final content = _buffer.toString();

          // Check for JSON-RPC responses
          if (content.contains('"jsonrpc":"2.0"') ||
              content.contains('"jsonrpc": "2.0"')) {
            _logger.debug('Detected JSON-RPC data in compressed SSE stream');

            try {
              final jsonStart = content.indexOf('{');
              final jsonEnd = content.lastIndexOf('}') + 1;

              if (jsonStart >= 0 && jsonEnd > jsonStart) {
                final jsonStr = content.substring(jsonStart, jsonEnd);
                _logger.debug(
                  'Extracted compressed JSON: ${jsonStr.substring(0, 100)}...',
                );

                try {
                  final jsonData = jsonDecode(jsonStr);
                  _logger.debug(
                    'Parsed compressed JSON-RPC data: ${jsonData.toString().substring(0, 100)}...',
                  );

                  // Clear processed data from buffer
                  if (jsonEnd < content.length) {
                    _buffer.clear();
                    _buffer.write(content.substring(jsonEnd));
                  } else {
                    _buffer.clear();
                  }

                  // Forward to message handler
                  if (onMessage != null) {
                    onMessage(jsonData);
                  }
                  return;
                } catch (e) {
                  _logger.debug('JSON parse error in compressed stream: $e');
                }
              }
            } catch (e) {
              _logger.debug(
                'Error extracting JSON from compressed stream: $e',
              );
            }
          }

          // Process SSE events
          final event = _processBuffer();
          _logger.debug(
            'Processed compressed SSE event: ${event.event}, data: ${event.data}',
          );

          if (event.event == 'endpoint' && event.data != null) {
            _logger.debug(
              'Received compressed endpoint event: ${event.data}',
            );
            if (onOpen != null) {
              onOpen(event.data);
            }
          } else if (event.data != null && onMessage != null) {
            onMessage(event.data);
          }
        } catch (e) {
          _logger.debug('Error processing compressed SSE data: $e');
        }
      },
      onError: (e) {
        _logger.debug('Compressed EventSource error: $e');
        _isConnected = false;
        if (onError != null) {
          onError(e);
        }
      },
      onDone: () {
        _logger.debug('Compressed EventSource stream closed');
        _isConnected = false;
        if (onError != null) {
          onError('Compressed connection closed');
        }
      },
    );
  } catch (e) {
    _logger.debug('Compressed EventSource connection error: $e');
    _isConnected = false;
    if (onError != null) {
      onError(e);
    }
    rethrow;
  }
}