create static method

Future<SseCompressedClientTransport> create({
  1. required String serverUrl,
  2. Map<String, String>? headers,
  3. List<CompressionType>? supportedCompressions,
  4. int compressionThreshold = 1024,
})

Create SSE transport with compression negotiation

Implementation

static Future<SseCompressedClientTransport> create({
  required String serverUrl,
  Map<String, String>? headers,
  List<CompressionType>? supportedCompressions,
  int compressionThreshold = 1024,
}) async {
  final transport = SseCompressedClientTransport._internal(
    serverUrl: serverUrl,
    headers: headers,
    supportedCompressions: supportedCompressions,
    compressionThreshold: compressionThreshold,
  );

  try {
    // Set up compression negotiation headers
    final compressionHeaders = Map<String, String>.from(
      transport._baseHeaders,
    );

    // Add Accept-Encoding for compression negotiation
    final acceptEncodings = transport._supportedCompressions
        .map((c) => c.encoding)
        .join(', ');
    compressionHeaders['Accept-Encoding'] = acceptEncodings;

    // Add standard SSE headers
    compressionHeaders['Accept'] = 'text/event-stream';
    compressionHeaders['Cache-Control'] = 'no-cache';
    compressionHeaders['Connection'] = 'keep-alive';

    _logger.debug('Connecting with compression support: $acceptEncodings');

    // Set up event handlers
    final endpointCompleter = Completer<String>();

    await transport._eventSource.connect(
      serverUrl,
      headers: compressionHeaders,
      compressionThreshold: transport._compressionThreshold,
      onOpen: (endpoint) {
        if (!endpointCompleter.isCompleted && endpoint != null) {
          endpointCompleter.complete(endpoint);
        }
      },
      onMessage: (data) {
        if (!transport._messageController.isClosed) {
          transport._messageController.add(data);
        }
      },
      onError: (e) {
        _logger.debug('SSE compression error: $e');
        if (!endpointCompleter.isCompleted) {
          endpointCompleter.completeError(e);
        }
        transport._handleError(e);
      },
      onCompressionNegotiated: (compression) {
        transport._negotiatedCompression = compression;
        _logger.debug('Compression negotiated: ${compression.encoding}');
      },
    );

    // Wait for endpoint
    final endpointPath = await endpointCompleter.future.timeout(
      Duration(seconds: 15),
      onTimeout:
          () => throw McpError('Timed out waiting for compressed endpoint'),
    );

    transport._messageEndpoint =
        endpointPath.startsWith('http')
            ? endpointPath
            : transport._constructEndpointUrl(
              Uri.parse(serverUrl),
              endpointPath,
            );

    _logger.debug(
      'Compressed SSE transport ready: ${transport._messageEndpoint}',
    );
    _logger.debug(
      'Active compression: ${transport._negotiatedCompression.encoding}',
    );

    return transport;
  } catch (e) {
    transport.close();
    throw McpError('Failed to establish compressed SSE connection: $e');
  }
}