receive method

  1. @override
Future<MessageEnvelope?> receive(
  1. String? correlationId,
  2. int waitTimeout
)

Receives an incoming message and removes it from the queue.

  • correlationId (optional) transaction id to trace execution through call chain.
  • waitTimeout a timeout in milliseconds to wait for a message to come. Returns Future that receives a message Throws error.

Implementation

@override
Future<MessageEnvelope?> receive(
    String? correlationId, int waitTimeout) async {
  checkOpen(correlationId);

  // Subscribe to topic if needed
  await subscribe(correlationId);

  MessageEnvelope? message;

  // Return message immediately if it exist
  if (_messages.isNotEmpty) {
    message = _messages.isNotEmpty ? _messages.removeAt(0) : null;
    return message;
  }

  // Otherwise wait and return
  var checkInterval = 100;
  var elapsedTime = 0;
  while (true) {
    var test = isOpen() && elapsedTime < waitTimeout && message == null;
    if (!test) break;

    message = await Future<MessageEnvelope?>.delayed(
        Duration(milliseconds: checkInterval), () async {
      message = _messages.isNotEmpty ? _messages.removeAt(0) : null;
      return message;
    });

    elapsedTime += checkInterval;
  }

  return message;
}