connect method
Future<void>
connect(
- String url, {
- dynamic onOpen(
- String?
)?,
- dynamic onMessage(
- dynamic
)?,
- dynamic onError(
- dynamic
)?,
- dynamic onHeartbeat(
- Duration
)?,
})
Implementation
Future<void> connect(
String url, {
Map<String, String>? headers,
Function(String?)? onOpen,
Function(dynamic)? onMessage,
Function(dynamic)? onError,
Function(Duration)? onHeartbeat,
}) async {
_logger.debug('HeartbeatEventSource connecting to: $url');
if (_isConnected) {
throw McpError('HeartbeatEventSource is already connected');
}
try {
_client = HttpClient();
_request = await _client!.getUrl(Uri.parse(url));
// Set heartbeat and SSE headers
if (headers != null) {
headers.forEach((key, value) {
_request!.headers.set(key, value);
});
}
_response = await _request!.close();
if (_response!.statusCode != 200) {
final body = await _response!.transform(utf8.decoder).join();
throw McpError(
'Failed to connect to heartbeat SSE endpoint: ${_response!.statusCode} - $body',
);
}
_isConnected = true;
_logger.debug('Heartbeat EventSource connection established');
// Set up subscription to process heartbeat events
_subscription = _response!.listen(
(List<int> data) {
try {
final chunk = utf8.decode(data, allowMalformed: true);
_logger.debug('Raw heartbeat SSE data: [$chunk]');
_buffer.write(chunk);
// Process all events in buffer
final content = _buffer.toString();
// Check for heartbeat responses
if (content.contains('event: heartbeat') ||
content.contains('event:heartbeat')) {
_logger.debug('Detected heartbeat event in SSE stream');
final event = _processBuffer();
if (event.event == 'heartbeat' && onHeartbeat != null) {
final latency = _calculateLatency(event.data);
onHeartbeat(latency);
return;
}
}
// Check for JSON-RPC responses
if (content.contains('"jsonrpc":"2.0"') ||
content.contains('"jsonrpc": "2.0"')) {
_logger.debug('Detected JSON-RPC data in heartbeat SSE stream');
try {
final jsonStart = content.indexOf('{');
final jsonEnd = content.lastIndexOf('}') + 1;
if (jsonStart >= 0 && jsonEnd > jsonStart) {
final jsonStr = content.substring(jsonStart, jsonEnd);
_logger.debug(
'Extracted heartbeat JSON: ${jsonStr.substring(0, 100)}...',
);
try {
final jsonData = jsonDecode(jsonStr);
_logger.debug('Parsed heartbeat JSON-RPC data');
// Clear processed data from buffer
if (jsonEnd < content.length) {
_buffer.clear();
_buffer.write(content.substring(jsonEnd));
} else {
_buffer.clear();
}
// Forward to message handler
if (onMessage != null) {
onMessage(jsonData);
}
return;
} catch (e) {
_logger.debug('JSON parse error in heartbeat stream: $e');
}
}
} catch (e) {
_logger.debug(
'Error extracting JSON from heartbeat stream: $e',
);
}
}
// Process regular SSE events
final event = _processBuffer();
_logger.debug(
'Processed heartbeat SSE event: ${event.event}, data: ${event.data}',
);
if (event.event == 'endpoint' && event.data != null) {
_logger.debug('Received heartbeat endpoint event: ${event.data}');
if (onOpen != null) {
onOpen(event.data);
}
} else if (event.data != null && onMessage != null) {
onMessage(event.data);
}
} catch (e) {
_logger.debug('Error processing heartbeat SSE data: $e');
}
},
onError: (e) {
_logger.debug('Heartbeat EventSource error: $e');
_isConnected = false;
if (onError != null) {
onError(e);
}
},
onDone: () {
_logger.debug('Heartbeat EventSource stream closed');
_isConnected = false;
if (onError != null) {
onError('Heartbeat connection closed');
}
},
);
} catch (e) {
_logger.debug('Heartbeat EventSource connection error: $e');
_isConnected = false;
if (onError != null) {
onError(e);
}
rethrow;
}
}