send method

  1. @override
void send(
  1. dynamic message
)
override

Send a message through the transport

Implementation

@override
void send(dynamic message) {
  if (_isClosed) return;

  try {
    _logger.debug('StreamableHTTP send() called with message: $message');

    // A message is a JSON-RPC RESPONSE only when it has an id AND no
    // method (i.e. it carries `result` or `error`). A server-INITIATED
    // request (e.g., `sampling/createMessage`, `elicitation/create`)
    // also has an id but is NOT a response — it must be routed to the
    // standalone GET stream so the client can pick it up. The earlier
    // `containsKey('id')` check sent every id-bearing message into
    // the response-completion path, which dropped outbound
    // server-initiated requests on the floor with `No pending request
    // found for response with ID: srv-N`.
    final isResponse = message is Map &&
        message.containsKey('id') &&
        !message.containsKey('method');

    if (isResponse) {
      final requestId = message['id'];
      _logger.debug('Response ID: $requestId (type: ${requestId.runtimeType})');

      // Generate event ID for resumability
      final eventId = (_eventIdCounter++).toString();

      // Store event for resumability
      _eventStore[eventId] = EventMessage(
        message: Map<String, dynamic>.from(message),
        eventId: eventId,
      );

      // Handle based on mode
      if (config.isJsonResponseEnabled) {
        if (config.jsonResponseMode == 'sync') {
          // Synchronous JSON mode: complete the pending completer
          if (_pendingCompleters.containsKey(requestId)) {
            try {
              _logger.debug('Completing completer for request ID: $requestId');
              _pendingCompleters[requestId]!.complete(Map<String, dynamic>.from(message));
              _pendingCompleters.remove(requestId);
              _logger.debug('Successfully completed completer for request ID: $requestId');
            } catch (e, stackTrace) {
              _logger.error('Error completing completer for request ID $requestId: $e');
              _logger.debug('Stack trace: $stackTrace');
            }
          } else {
            _logger.warning('No pending completer for response ID: $requestId');
            _logger.debug('Available completers: ${_pendingCompleters.keys.toList()}');
          }
        } else {
          // Asynchronous JSON mode: store response for polling
          // Get sessionId from pending request
          final pendingRequest = _pendingRequests[requestId];
          if (pendingRequest != null) {
            final responseKey = '${pendingRequest.sessionId}:$requestId';
            _responseStore[responseKey] = Map<String, dynamic>.from(message);
            _responseTimestamps[responseKey] = DateTime.now();
            _pendingRequests.remove(requestId);
          } else {
            _logger.warning('No pending request found for async response ID: $requestId');
          }
        }
      } else if (_sseStreams.containsKey(requestId)) {
        // SSE response for specific request
        final stream = _sseStreams[requestId]!;
        _sendSseEvent(stream.controller, Map<String, dynamic>.from(message), eventId: eventId);

        // If this is a response or error, close the stream
        if (message.containsKey('result') || message.containsKey('error')) {
          stream.controller.close();
          _sseStreams.remove(requestId);
          _messageRouters.remove(requestId)?.close();
        }
      } else {
        // Log when we can't find a pending request for a response
        _logger.warning('No pending request found for response with ID: $requestId');
        _logger.debug('Current pending requests: ${_pendingRequests.keys.toList()}');
        _logger.debug('Current SSE streams: ${_sseStreams.keys.toList()}');
      }
    } else {
      // Notification or server-initiated message
      final eventId = (_eventIdCounter++).toString();

      // Remove internal metadata before storing and sending
      final cleanMessage = Map<String, dynamic>.from(message);
      final targetSessionId = cleanMessage.remove('_targetSessionId') as String?;

      _eventStore[eventId] = EventMessage(
        message: cleanMessage,
        eventId: eventId,
      );

      var sent = false;

      // If target session is specified, send only to that session's GET stream
      if (targetSessionId != null && _getStreams.containsKey(targetSessionId)) {
        _logger.debug('📤 Sending notification to target session: $targetSessionId');
        _sendSseEvent(_getStreams[targetSessionId]!.controller, cleanMessage, eventId: eventId);
        sent = true;
      } else if (targetSessionId == null) {
        // Broadcast mode: send to all GET streams
        for (final entry in _getStreams.entries) {
          _logger.debug('📤 Broadcasting notification to GET stream (session: ${entry.key})');
          _sendSseEvent(entry.value.controller, cleanMessage, eventId: eventId);
          sent = true;
        }

        // If no GET stream available, send to all active POST SSE streams
        if (!sent && _sseStreams.isNotEmpty) {
          _logger.debug('No GET stream available, sending to ${_sseStreams.length} active POST SSE streams');
          for (final entry in _sseStreams.entries) {
            _logger.debug('📤 Broadcasting notification to POST SSE stream (requestId: ${entry.key})');
            _sendSseEvent(entry.value.controller, cleanMessage, eventId: eventId);
            sent = true;
          }
        }
      } else {
        _logger.warning('Target session $targetSessionId not found or no GET stream available');
      }

      if (!sent) {
        _logger.debug('No streams available to send notification');
      }
    }
  } catch (e, stackTrace) {
    _logger.error('Error sending message: $e');
    _logger.debug('Stack trace: $stackTrace');
  }
}