connect method
Future<void>
connect(
- String url, {
- int compressionThreshold = 1024,
- dynamic onOpen(
- String?
)?,
- dynamic onMessage(
- dynamic
)?,
- dynamic onError(
- dynamic
)?,
- dynamic onCompressionNegotiated(
- CompressionType
)?,
})
Implementation
Future<void> connect(
String url, {
Map<String, String>? headers,
int compressionThreshold = 1024,
Function(String?)? onOpen,
Function(dynamic)? onMessage,
Function(dynamic)? onError,
Function(CompressionType)? onCompressionNegotiated,
}) async {
_logger.debug('CompressedEventSource connecting to: $url');
if (_isConnected) {
throw McpError('CompressedEventSource is already connected');
}
try {
_client = HttpClient();
_request = await _client!.getUrl(Uri.parse(url));
// Set compression and SSE headers
if (headers != null) {
headers.forEach((key, value) {
_request!.headers.set(key, value);
});
}
_response = await _request!.close();
if (_response!.statusCode != 200) {
final body = await _response!.transform(utf8.decoder).join();
throw McpError(
'Failed to connect to compressed SSE endpoint: ${_response!.statusCode} - $body',
);
}
// Check negotiated compression
final contentEncoding = _response!.headers.value('content-encoding');
_activeCompression = _parseCompressionType(contentEncoding);
if (onCompressionNegotiated != null) {
onCompressionNegotiated(_activeCompression);
}
_isConnected = true;
_logger.debug(
'Compressed EventSource connection established with encoding: ${_activeCompression.encoding}',
);
// Set up subscription to process compressed events
_subscription = _response!.listen(
(List<int> data) {
try {
// Decompress data if needed
List<int> decompressedData;
switch (_activeCompression) {
case CompressionType.gzip:
decompressedData = gzip.decode(data);
break;
case CompressionType.deflate:
decompressedData = zlib.decode(data);
break;
case CompressionType.brotli:
// Brotli decompression would go here
_logger.debug('Brotli decompression not implemented');
decompressedData = data;
break;
case CompressionType.none:
decompressedData = data;
break;
}
final chunk = utf8.decode(decompressedData, allowMalformed: true);
_logger.debug('Raw decompressed SSE data: [$chunk]');
_buffer.write(chunk);
// Process all events in buffer
final content = _buffer.toString();
// Check for JSON-RPC responses
if (content.contains('"jsonrpc":"2.0"') ||
content.contains('"jsonrpc": "2.0"')) {
_logger.debug('Detected JSON-RPC data in compressed SSE stream');
try {
final jsonStart = content.indexOf('{');
final jsonEnd = content.lastIndexOf('}') + 1;
if (jsonStart >= 0 && jsonEnd > jsonStart) {
final jsonStr = content.substring(jsonStart, jsonEnd);
_logger.debug(
'Extracted compressed JSON: ${jsonStr.substring(0, 100)}...',
);
try {
final jsonData = jsonDecode(jsonStr);
_logger.debug(
'Parsed compressed JSON-RPC data: ${jsonData.toString().substring(0, 100)}...',
);
// Clear processed data from buffer
if (jsonEnd < content.length) {
_buffer.clear();
_buffer.write(content.substring(jsonEnd));
} else {
_buffer.clear();
}
// Forward to message handler
if (onMessage != null) {
onMessage(jsonData);
}
return;
} catch (e) {
_logger.debug('JSON parse error in compressed stream: $e');
}
}
} catch (e) {
_logger.debug(
'Error extracting JSON from compressed stream: $e',
);
}
}
// Process SSE events
final event = _processBuffer();
_logger.debug(
'Processed compressed SSE event: ${event.event}, data: ${event.data}',
);
if (event.event == 'endpoint' && event.data != null) {
_logger.debug(
'Received compressed endpoint event: ${event.data}',
);
if (onOpen != null) {
onOpen(event.data);
}
} else if (event.data != null && onMessage != null) {
onMessage(event.data);
}
} catch (e) {
_logger.debug('Error processing compressed SSE data: $e');
}
},
onError: (e) {
_logger.debug('Compressed EventSource error: $e');
_isConnected = false;
if (onError != null) {
onError(e);
}
},
onDone: () {
_logger.debug('Compressed EventSource stream closed');
_isConnected = false;
if (onError != null) {
onError('Compressed connection closed');
}
},
);
} catch (e) {
_logger.debug('Compressed EventSource connection error: $e');
_isConnected = false;
if (onError != null) {
onError(e);
}
rethrow;
}
}