send method
Send a message through the transport
Implementation
@override
void send(dynamic message) {
if (_isClosed) return;
try {
_logger.debug('StreamableHTTP send() called with message: $message');
// A message is a JSON-RPC RESPONSE only when it has an id AND no
// method (i.e. it carries `result` or `error`). A server-INITIATED
// request (e.g., `sampling/createMessage`, `elicitation/create`)
// also has an id but is NOT a response — it must be routed to the
// standalone GET stream so the client can pick it up. The earlier
// `containsKey('id')` check sent every id-bearing message into
// the response-completion path, which dropped outbound
// server-initiated requests on the floor with `No pending request
// found for response with ID: srv-N`.
final isResponse = message is Map &&
message.containsKey('id') &&
!message.containsKey('method');
if (isResponse) {
final requestId = message['id'];
_logger.debug('Response ID: $requestId (type: ${requestId.runtimeType})');
// Generate event ID for resumability
final eventId = (_eventIdCounter++).toString();
// Store event for resumability
_eventStore[eventId] = EventMessage(
message: Map<String, dynamic>.from(message),
eventId: eventId,
);
// Handle based on mode
if (config.isJsonResponseEnabled) {
if (config.jsonResponseMode == 'sync') {
// Synchronous JSON mode: complete the pending completer
if (_pendingCompleters.containsKey(requestId)) {
try {
_logger.debug('Completing completer for request ID: $requestId');
_pendingCompleters[requestId]!.complete(Map<String, dynamic>.from(message));
_pendingCompleters.remove(requestId);
_logger.debug('Successfully completed completer for request ID: $requestId');
} catch (e, stackTrace) {
_logger.error('Error completing completer for request ID $requestId: $e');
_logger.debug('Stack trace: $stackTrace');
}
} else {
_logger.warning('No pending completer for response ID: $requestId');
_logger.debug('Available completers: ${_pendingCompleters.keys.toList()}');
}
} else {
// Asynchronous JSON mode: store response for polling
// Get sessionId from pending request
final pendingRequest = _pendingRequests[requestId];
if (pendingRequest != null) {
final responseKey = '${pendingRequest.sessionId}:$requestId';
_responseStore[responseKey] = Map<String, dynamic>.from(message);
_responseTimestamps[responseKey] = DateTime.now();
_pendingRequests.remove(requestId);
} else {
_logger.warning('No pending request found for async response ID: $requestId');
}
}
} else if (_sseStreams.containsKey(requestId)) {
// SSE response for specific request
final stream = _sseStreams[requestId]!;
_sendSseEvent(stream.controller, Map<String, dynamic>.from(message), eventId: eventId);
// If this is a response or error, close the stream
if (message.containsKey('result') || message.containsKey('error')) {
stream.controller.close();
_sseStreams.remove(requestId);
_messageRouters.remove(requestId)?.close();
}
} else {
// Log when we can't find a pending request for a response
_logger.warning('No pending request found for response with ID: $requestId');
_logger.debug('Current pending requests: ${_pendingRequests.keys.toList()}');
_logger.debug('Current SSE streams: ${_sseStreams.keys.toList()}');
}
} else {
// Notification or server-initiated message
final eventId = (_eventIdCounter++).toString();
// Remove internal metadata before storing and sending
final cleanMessage = Map<String, dynamic>.from(message);
final targetSessionId = cleanMessage.remove('_targetSessionId') as String?;
_eventStore[eventId] = EventMessage(
message: cleanMessage,
eventId: eventId,
);
var sent = false;
// If target session is specified, send only to that session's GET stream
if (targetSessionId != null && _getStreams.containsKey(targetSessionId)) {
_logger.debug('📤 Sending notification to target session: $targetSessionId');
_sendSseEvent(_getStreams[targetSessionId]!.controller, cleanMessage, eventId: eventId);
sent = true;
} else if (targetSessionId == null) {
// Broadcast mode: send to all GET streams
for (final entry in _getStreams.entries) {
_logger.debug('📤 Broadcasting notification to GET stream (session: ${entry.key})');
_sendSseEvent(entry.value.controller, cleanMessage, eventId: eventId);
sent = true;
}
// If no GET stream available, send to all active POST SSE streams
if (!sent && _sseStreams.isNotEmpty) {
_logger.debug('No GET stream available, sending to ${_sseStreams.length} active POST SSE streams');
for (final entry in _sseStreams.entries) {
_logger.debug('📤 Broadcasting notification to POST SSE stream (requestId: ${entry.key})');
_sendSseEvent(entry.value.controller, cleanMessage, eventId: eventId);
sent = true;
}
}
} else {
_logger.warning('Target session $targetSessionId not found or no GET stream available');
}
if (!sent) {
_logger.debug('No streams available to send notification');
}
}
} catch (e, stackTrace) {
_logger.error('Error sending message: $e');
_logger.debug('Stack trace: $stackTrace');
}
}