listen method

  1. @override
void listen(
  1. String? correlationId,
  2. IMessageReceiver receiver
)

Listens for incoming messages and blocks the current thread until queue is closed.

  • correlationId (optional) transaction id to trace execution through call chain.
  • receiver a receiver to receive incoming messages.

See IMessageReceiver See receive

Implementation

@override
void listen(String? correlationId, IMessageReceiver receiver) async {
  checkOpen(correlationId);

  // Subscribe to topic if needed
  await subscribe(correlationId).then((value) {
    _logger.trace(null, 'Started listening messages at %s', [getName()]);

    // Resend collected messages to receiver
    while (isOpen() && _messages.isNotEmpty) {
      var message = _messages.isNotEmpty ? _messages.removeAt(0) : null;
      if (message != null) {
        sendMessageToReceiver(receiver, message);
      }
    }

    // Set the receiver
    if (isOpen()) {
      _receiver = receiver;
    }
  });
}