listen method

  1. @override
void listen(
  1. String? correlationId,
  2. IMessageReceiver receiver
)
override

Listens for incoming messages and blocks the current thread until queue is closed.

  • correlationId (optional) transaction id to trace execution through call chain.
  • receiver a receiver to receive incoming messages.

See IMessageReceiver See receive

Implementation

@override
void listen(String? correlationId, IMessageReceiver receiver) async {
  var timeoutInterval = 1000;

  logger.trace(null, 'Started listening messages at %s', [toString()]);
  _cancel = false;
  try {
    for (; !_cancel;) {
      MessageEnvelope? message;

      try {
        var result = await receive(correlationId, timeoutInterval);
        message = result;
      } catch (err) {
        logger.error(
            correlationId, err as Exception, 'Failed to receive the message');
      }

      if (message != null && !_cancel) {
        try {
          await receiver.receiveMessage(message, this);
        } catch (err) {
          logger.error(correlationId, err as Exception,
              'Failed to process the message');
        }
      }

      await Future.delayed(
          Duration(milliseconds: timeoutInterval), () => null);
    }
  } catch (err) {
    logger.error(correlationId, ApplicationException().wrap(err),
        'Failed to process the message');
  }
}