receive method
Receives an incoming message and removes it from the queue.
correlationId
(optional) transaction id to trace execution through call chain.waitTimeout
a timeout in milliseconds to wait for a message to come. Returns Future that receives a message Throws error.
Implementation
@override
Future<MessageEnvelope?> receive(
String? correlationId, int waitTimeout) async {
checkOpen(correlationId);
// Subscribe to topic if needed
await subscribe(correlationId);
MessageEnvelope? message;
// Return message immediately if it exist
if (_messages.isNotEmpty) {
message = _messages.isNotEmpty ? _messages.removeAt(0) : null;
return message;
}
// Otherwise wait and return
var checkInterval = 100;
var elapsedTime = 0;
while (true) {
var test = isOpen() && elapsedTime < waitTimeout && message == null;
if (!test) break;
message = await Future<MessageEnvelope?>.delayed(
Duration(milliseconds: checkInterval), () async {
message = _messages.isNotEmpty ? _messages.removeAt(0) : null;
return message;
});
elapsedTime += checkInterval;
}
return message;
}