listen method
Listens for incoming messages and blocks the current thread until _queue is closed. Parameters:
correlationId
(optional) transaction id to trace execution through call chain. Returns Future that recive null on compleate Throws error
Implementation
@override
Future listen(String? correlationId, IMessageReceiver receiver) async {
try {
_checkOpened('');
} catch (err) {
logger.error(correlationId, err as Exception,
'RabbitMQMessageQueue:Listen: Can\'t start listen ' + err.toString());
rethrow;
}
logger.debug(correlationId, 'Started listening messages at %s', [name]);
try {
_consumer = await _queue!.consume();
} catch (err) {
logger.error(
correlationId,
err as Exception,
'RabbitMQMessageQueue:Listen: Can\'t consume to _queue' +
err.toString());
rethrow;
}
_consumer!.listen((amqp.AmqpMessage msg) {
var message = _toMessage(msg);
counters.incrementOne('_queue.' + name + '.received_messages');
logger.debug(message?.correlation_id, 'Received message %s via %s',
[message, name]);
try {
receiver.receiveMessage(message!, this);
} catch (err) {
logger.error(
message?.correlation_id,
err as Exception,
'Processing received message %s error in _queue %s',
[message, name]);
}
msg.ack();
});
}