listen method

  1. @override
void listen(
  1. String? correlationId,
  2. IMessageReceiver receiver
)
override

Listens for incoming messages and blocks the current thread until queue is closed.

  • correlationId (optional) transaction id to trace execution through call chain.
  • receiver a receiver to receive incoming messages.

See IMessageReceiver See receive

Implementation

@override
void listen(String? correlationId, IMessageReceiver receiver) {
  if (!isOpen()) {
    return;
  }

  var listenFunc = () async {
    // Subscribe to topic if needed
    await subscribe(correlationId);

    logger.trace(null, 'Started listening messages at %s', [getName()]);

    // Resend collected messages to receiver
    while (isOpen() && _messages.isNotEmpty) {
      var message = _messages.isNotEmpty ? _messages.removeAt(0) : null;
      if (message != null) {
        await sendMessageToReceiver(receiver, message);
      }
    }

    // Set the receiver
    if (isOpen()) {
      _receiver = receiver;
    }
  };
  listenFunc();
}