connect method
Connect to an SSE endpoint
Implementation
@override
Future<void> connect(
String url, {
Map<String, String>? headers,
Function(String?)? onOpen,
Function(dynamic)? onMessage,
Function(dynamic)? onError,
Function(String?)? onEndpoint,
Function(int, String)? onAuthFailure,
}) async {
_logger.debug('AuthenticatedEventSource connecting to: $url');
if (_isConnected) {
throw McpError('AuthenticatedEventSource is already connected');
}
try {
_client = HttpClient();
_request = await _client!.getUrl(Uri.parse(url));
// Set authentication and SSE headers
if (headers != null) {
headers.forEach((key, value) {
_request!.headers.set(key, value);
});
}
_response = await _request!.close();
// Handle authentication failures
if (_response!.statusCode == 401 || _response!.statusCode == 403) {
final body = await _response!.transform(utf8.decoder).join();
_logger.debug(
'Authentication failed: ${_response!.statusCode} - $body',
);
if (onAuthFailure != null) {
onAuthFailure(_response!.statusCode, body);
}
throw McpError('Authentication failed: ${_response!.statusCode}');
}
if (_response!.statusCode != 200) {
final body = await _response!.transform(utf8.decoder).join();
throw McpError(
'Failed to connect to authenticated SSE endpoint: ${_response!.statusCode} - $body',
);
}
_isConnected = true;
_logger.debug('Authenticated EventSource connection established');
// Set up subscription to process events
_subscription = _response!.listen(
(List<int> data) {
try {
final chunk = utf8.decode(data, allowMalformed: true);
_logger.debug('Raw authenticated SSE data: [$chunk]');
_buffer.write(chunk);
// Process all events in buffer
final content = _buffer.toString();
// Check for JSON-RPC responses
if (content.contains('"jsonrpc":"2.0"') ||
content.contains('"jsonrpc": "2.0"')) {
_logger.debug(
'Detected JSON-RPC data in authenticated SSE stream',
);
try {
final jsonStart = content.indexOf('{');
final jsonEnd = content.lastIndexOf('}') + 1;
if (jsonStart >= 0 && jsonEnd > jsonStart) {
final jsonStr = content.substring(jsonStart, jsonEnd);
_logger.debug('Extracted authenticated JSON: $jsonStr');
try {
final jsonData = jsonDecode(jsonStr);
_logger.debug(
'Parsed authenticated JSON-RPC data: $jsonData',
);
// Clear processed data from buffer
if (jsonEnd < content.length) {
_buffer.clear();
_buffer.write(content.substring(jsonEnd));
} else {
_buffer.clear();
}
// Forward to message handler
if (onMessage != null) {
onMessage(jsonData);
}
return;
} catch (e) {
_logger.debug(
'JSON parse error in authenticated stream: $e',
);
}
}
} catch (e) {
_logger.debug(
'Error extracting JSON from authenticated stream: $e',
);
}
}
// Process SSE events
final event = _processBuffer();
_logger.debug(
'Processed authenticated SSE event: ${event.event}, data: ${event.data}',
);
if (event.event == 'endpoint' && event.data != null) {
_logger.debug(
'Received authenticated endpoint event: ${event.data}',
);
if (onOpen != null) {
onOpen(event.data);
}
} else if (event.data != null && onMessage != null) {
onMessage(event.data);
}
} catch (e) {
_logger.debug('Error processing authenticated SSE data: $e');
}
},
onError: (e) {
_logger.debug('Authenticated EventSource error: $e');
_isConnected = false;
if (onError != null) {
onError(e);
}
},
onDone: () {
_logger.debug('Authenticated EventSource stream closed');
_isConnected = false;
if (onError != null) {
onError('Authenticated connection closed');
}
},
);
} catch (e) {
_logger.debug('Authenticated EventSource connection error: $e');
_isConnected = false;
if (onError != null) {
onError(e);
}
rethrow;
}
}