create static method
Create SSE transport with compression negotiation
Implementation
static Future<SseCompressedClientTransport> create({
required String serverUrl,
Map<String, String>? headers,
List<CompressionType>? supportedCompressions,
int compressionThreshold = 1024,
}) async {
final transport = SseCompressedClientTransport._internal(
serverUrl: serverUrl,
headers: headers,
supportedCompressions: supportedCompressions,
compressionThreshold: compressionThreshold,
);
try {
// Set up compression negotiation headers
final compressionHeaders = Map<String, String>.from(
transport._baseHeaders,
);
// Add Accept-Encoding for compression negotiation
final acceptEncodings = transport._supportedCompressions
.map((c) => c.encoding)
.join(', ');
compressionHeaders['Accept-Encoding'] = acceptEncodings;
// Add standard SSE headers
compressionHeaders['Accept'] = 'text/event-stream';
compressionHeaders['Cache-Control'] = 'no-cache';
compressionHeaders['Connection'] = 'keep-alive';
_logger.debug('Connecting with compression support: $acceptEncodings');
// Set up event handlers
final endpointCompleter = Completer<String>();
await transport._eventSource.connect(
serverUrl,
headers: compressionHeaders,
compressionThreshold: transport._compressionThreshold,
onOpen: (endpoint) {
if (!endpointCompleter.isCompleted && endpoint != null) {
endpointCompleter.complete(endpoint);
}
},
onMessage: (data) {
if (!transport._messageController.isClosed) {
transport._messageController.add(data);
}
},
onError: (e) {
_logger.debug('SSE compression error: $e');
if (!endpointCompleter.isCompleted) {
endpointCompleter.completeError(e);
}
transport._handleError(e);
},
onCompressionNegotiated: (compression) {
transport._negotiatedCompression = compression;
_logger.debug('Compression negotiated: ${compression.encoding}');
},
);
// Wait for endpoint
final endpointPath = await endpointCompleter.future.timeout(
Duration(seconds: 15),
onTimeout:
() => throw McpError('Timed out waiting for compressed endpoint'),
);
transport._messageEndpoint =
endpointPath.startsWith('http')
? endpointPath
: transport._constructEndpointUrl(
Uri.parse(serverUrl),
endpointPath,
);
_logger.debug(
'Compressed SSE transport ready: ${transport._messageEndpoint}',
);
_logger.debug(
'Active compression: ${transport._negotiatedCompression.encoding}',
);
return transport;
} catch (e) {
transport.close();
throw McpError('Failed to establish compressed SSE connection: $e');
}
}