receive method
override
Receives an incoming message and removes it from the queue.
correlationId
(optional) transaction id to trace execution through call chain.waitTimeout
a timeout in milliseconds to wait for a message to come. Return Future that receives a message Throws error.
Implementation
@override
Future<MessageEnvelope?> receive(
String? correlationId, int waitTimeout) async {
var err;
MessageEnvelope? message;
var messageReceived = false;
var checkIntervalMs = 100;
var i = 0;
// TODO maybe need update this realization
for (; i < waitTimeout && !messageReceived;) {
i = i + checkIntervalMs;
await Future.delayed(Duration(milliseconds: checkIntervalMs), () {
if (_messages.isEmpty) {
return null;
}
try {
// Get message the the queue
message = _messages.removeAt(0);
if (message != null) {
// Generate and set locked token
var lockedToken = _lockTokenSequence++;
message!.setReference(lockedToken);
// Add messages to locked messages list
var lockedMessage = LockedMessage();
var now = DateTime.now().toUtc();
lockedMessage.expirationTime =
now.add(Duration(milliseconds: waitTimeout));
lockedMessage.message = message;
lockedMessage.timeout = waitTimeout;
_lockedMessages[lockedToken] = lockedMessage;
// Instrument the process
counters.incrementOne('queue.' + getName() + '.received_messages');
logger.debug(message!.correlation_id, 'Received message %s via %s',
[message, toString()]);
}
} catch (ex) {
err = ex;
}
messageReceived = true;
return null;
});
}
if (err != null) {
throw err;
}
return message;
}