receive method

  1. @override
Future<MessageEnvelope?> receive(
  1. String? correlationId,
  2. int waitTimeout
)
override

Receives an incoming message and removes it from the queue.

  • correlationId (optional) transaction id to trace execution through call chain.
  • waitTimeout a timeout in milliseconds to wait for a message to come. Returns a received message or null.

Implementation

@override
Future<MessageEnvelope?> receive(
    String? correlationId, int waitTimeout) async {
  checkOpen(correlationId);

  // Subscribe to topic if needed
  await subscribe(correlationId);

  var checkIntervalMs = 100;
  var elapsedTime = 0;

  // Get message the the queue
  var message = _messages.isNotEmpty ? _messages.removeAt(0) : null;

  while (elapsedTime < waitTimeout && message == null) {
    // Wait for a while
    await Future.delayed(Duration(milliseconds: checkIntervalMs), () {});
    elapsedTime += checkIntervalMs;

    // Get message the the queue
    message = _messages.isNotEmpty ? _messages.removeAt(0) : null;
  }

  return message;
}