sendToDestination method

Future<void> sendToDestination({
  1. required String destination,
  2. String? body,
  3. Uint8List? bodyBytes,
  4. String? contentType,
  5. Map<String, String>? headers,
})

Sends a message to a destination

Implementation

Future<void> sendToDestination({
  required String destination,
  String? body,
  Uint8List? bodyBytes,
  String? contentType,
  Map<String, String>? headers,
}) async {
  if (!_isRunning) {
    throw const StompStateException('Server not running', 'stopped', 'running');
  }

  final messageId = _generateMessageId();
  final messageHeaders = <String, String>{
    StompHeaders.destination: destination,
    StompHeaders.messageId: messageId,
  };

  if (contentType != null) messageHeaders[StompHeaders.contentType] = contentType;
  if (headers != null) messageHeaders.addAll(headers);

  Uint8List? frameBody;
  if (body != null) {
    frameBody = Uint8List.fromList(body.codeUnits);
  } else if (bodyBytes != null) {
    frameBody = bodyBytes;
  }

  if (frameBody != null) {
    messageHeaders[StompHeaders.contentLength] = frameBody.length.toString();
  }

  // Store message for the destination
  final messageFrame = StompFrame(
    command: StompCommands.message,
    headers: messageHeaders,
    body: frameBody,
  );

  _destinationMessages.putIfAbsent(destination, () => <StompFrame>[]).add(messageFrame);

  // Send to all subscribers
  final subscribers = _destinationSubscriptions[destination];
  if (subscribers != null) {
    for (final connection in subscribers) {
      if (connection.isActive) {
        await _sendMessageToConnection(connection, messageFrame, destination);
      }
    }
  }

  // Emit message event
  final message = StompMessage.fromFrame(messageFrame);
  _messageController.add(message);

  _logger.info('Sent message to destination $destination (${subscribers?.length ?? 0} subscribers)');
}