listen method

  1. @override
Future listen(
  1. String? correlationId,
  2. IMessageReceiver receiver
)

Listens for incoming messages and blocks the current thread until _queue is closed. Parameters:

  • correlationId (optional) transaction id to trace execution through call chain. Returns Future that recive null on compleate Throws error

Implementation

@override
Future listen(String? correlationId, IMessageReceiver receiver) async {
  try {
    _checkOpened('');
  } catch (err) {
    logger.error(correlationId, err as Exception,
        'RabbitMQMessageQueue:Listen: Can\'t start listen ' + err.toString());
    rethrow;
  }

  logger.debug(correlationId, 'Started listening messages at %s', [name]);
  try {
    _consumer = await _queue!.consume();
  } catch (err) {
    logger.error(
        correlationId,
        err as Exception,
        'RabbitMQMessageQueue:Listen: Can\'t consume to _queue' +
            err.toString());
    rethrow;
  }

  _consumer!.listen((amqp.AmqpMessage msg) {
    var message = _toMessage(msg);
    counters.incrementOne('_queue.' + name + '.received_messages');
    logger.debug(message?.correlation_id, 'Received message %s via %s',
        [message, name]);
    try {
      receiver.receiveMessage(message!, this);
    } catch (err) {
      logger.error(
          message?.correlation_id,
          err as Exception,
          'Processing received message %s error in _queue %s',
          [message, name]);
    }
    msg.ack();
  });
}