send method
Send a message through the transport
Implementation
@override
void send(dynamic message) async {
if (_isClosed) {
_logger.debug('Attempted to send on closed compressed transport');
return;
}
if (_messageEndpoint == null) {
throw McpError(
'Cannot send message: Compressed SSE connection not established',
);
}
try {
final jsonMessage = jsonEncode(message);
final shouldCompress = jsonMessage.length >= _compressionThreshold;
_logger.debug(
'Sending message (${jsonMessage.length} bytes, compress: $shouldCompress): ${jsonMessage.substring(0, 100)}...',
);
final url = Uri.parse(_messageEndpoint!);
final client = HttpClient();
final request = await client.postUrl(url);
// Set content type
request.headers.contentType = ContentType.json;
// Add base headers
_baseHeaders.forEach((name, value) {
request.headers.add(name, value);
});
// Apply compression if needed and supported
List<int> bodyBytes = utf8.encode(jsonMessage);
if (shouldCompress && _negotiatedCompression != CompressionType.none) {
bodyBytes = await _compressData(bodyBytes, _negotiatedCompression);
request.headers.set(
'Content-Encoding',
_negotiatedCompression.encoding,
);
_logger.debug(
'Compressed message from ${jsonMessage.length} to ${bodyBytes.length} bytes',
);
}
// Send the request
request.add(bodyBytes);
final response = await request.close();
// Handle response
if (response.statusCode == 200) {
final responseBody = await _decompressResponse(response);
_logger.debug(
'Compressed message delivery confirmation: $responseBody',
);
} else {
final responseBody = await _decompressResponse(response);
_logger.debug('Error response: $responseBody');
throw McpError(
'Error sending compressed message: ${response.statusCode}',
);
}
client.close();
_logger.debug('Compressed message sent successfully');
} catch (e) {
_logger.debug('Error sending compressed message: $e');
rethrow;
}
}