receive method
Receives an incoming message and removes it from the queue.
correlationId
(optional) transaction id to trace execution through call chain.waitTimeout
a timeout in milliseconds to wait for a message to come. Returns a received message ornull
.
Implementation
@override
Future<MessageEnvelope?> receive(
String? correlationId, int waitTimeout) async {
checkOpen(correlationId);
// Subscribe to topic if needed
await subscribe(correlationId);
var checkIntervalMs = 100;
var elapsedTime = 0;
// Get message the the queue
var message = _messages.isNotEmpty ? _messages.removeAt(0) : null;
while (elapsedTime < waitTimeout && message == null) {
// Wait for a while
await Future.delayed(Duration(milliseconds: checkIntervalMs), () {});
elapsedTime += checkIntervalMs;
// Get message the the queue
message = _messages.isNotEmpty ? _messages.removeAt(0) : null;
}
return message;
}