create static method
Implementation
static Future<SseClientTransport> create({
required String serverUrl,
Map<String, String>? headers,
}) async {
final transport = SseClientTransport._internal(
serverUrl: serverUrl,
headers: headers,
);
try {
// Generate session ID for MCP standard compliance
final sessionId = _generateSessionId();
final sseUrlWithSession =
serverUrl.contains('?')
? '$serverUrl&session_id=$sessionId'
: '$serverUrl?session_id=$sessionId';
_logger.debug('SSE URL with session: $sseUrlWithSession');
// Set up event handlers
final endpointCompleter = Completer<String>();
await transport._eventSource.connect(
sseUrlWithSession,
headers: headers,
onMessage: (data) {
// This is crucial - forward messages to the controller
if (data is Map &&
data.containsKey('jsonrpc') &&
data.containsKey('id') &&
!transport._messageController.isClosed) {
_logger.debug('Forwarding JSON-RPC response: $data');
transport._messageController.add(data);
} else if (!transport._messageController.isClosed) {
transport._messageController.add(data);
}
},
onError: (e) {
_logger.debug('SSE error: $e');
if (!endpointCompleter.isCompleted) {
endpointCompleter.completeError(e);
}
transport._handleError(e);
},
onEndpoint: (endpoint) {
_logger.debug('Received endpoint from SSE: $endpoint');
if (!endpointCompleter.isCompleted && endpoint != null) {
endpointCompleter.complete(endpoint);
}
},
);
// Wait for endpoint
final endpointPath = await endpointCompleter.future.timeout(
Duration(seconds: 10),
onTimeout: () => throw McpError('Timed out waiting for endpoint'),
);
// Set up message endpoint following MCP standard
transport._messageEndpoint =
endpointPath.startsWith('http')
? endpointPath
: transport._constructEndpointUrl(
Uri.parse(serverUrl),
endpointPath,
);
_logger.debug(
'Transport ready with MCP standard endpoint: ${transport._messageEndpoint}',
);
return transport;
} catch (e) {
transport.close();
throw McpError('Failed to establish SSE connection: $e');
}
}