receive method

  1. @override
Future<MessageEnvelope?> receive(
  1. String? correlationId,
  2. int waitTimeout
)
override

Receives an incoming message and removes it from the queue.

  • correlationId (optional) transaction id to trace execution through call chain.
  • waitTimeout a timeout in milliseconds to wait for a message to come. Return Future that receives a message Throws error.

Implementation

@override
Future<MessageEnvelope?> receive(
    String? correlationId, int waitTimeout) async {
  var err;
  MessageEnvelope? message;
  var messageReceived = false;

  var checkIntervalMs = 100;
  var i = 0;
  // TODO maybe need update this realization
  for (; i < waitTimeout && !messageReceived;) {
    i = i + checkIntervalMs;

    await Future.delayed(Duration(milliseconds: checkIntervalMs), () {
      if (_messages.isEmpty) {
        return null;
      }

      try {
        // Get message the the queue
        message = _messages.removeAt(0);

        if (message != null) {
          // Generate and set locked token
          var lockedToken = _lockTokenSequence++;
          message!.setReference(lockedToken);

          // Add messages to locked messages list
          var lockedMessage = LockedMessage();
          var now = DateTime.now().toUtc();
          lockedMessage.expirationTime =
              now.add(Duration(milliseconds: waitTimeout));
          lockedMessage.message = message;
          lockedMessage.timeout = waitTimeout;
          _lockedMessages[lockedToken] = lockedMessage;

          // Instrument the process
          counters.incrementOne('queue.' + getName() + '.received_messages');
          logger.debug(message!.correlation_id, 'Received message %s via %s',
              [message, toString()]);
        }
      } catch (ex) {
        err = ex;
      }

      messageReceived = true;
      return null;
    });
  }

  if (err != null) {
    throw err;
  }

  return message;
}