dispatch method

Future dispatch()

Dispatches this job for execution through a worker isolate.

Behavior:

  • resolves or creates a QueueJob record
  • skips execution if the job is already complete
  • marks the job as processing/queued
  • spawns a worker isolate for the current runtime type if needed
  • sends the job to the worker and awaits the reply
  • marks the queue record complete on success

On failure, the queue record is marked as failed when possible.

Example:

final result = await SendWelcomeEmail('jane@example.com').dispatch();
print(result);

Implementation

Future<dynamic> dispatch() async {
  try {
    QueueJob? jobRecord = await job;

    if (jobRecord == null) {
      final payloadField = {'_name': runtimeType.toString(), ...toJson()};

      final modelData = {'status': 'created', 'payload': jsonEncode(payloadField)};

      jobRecord = QueueJob.fromJson(modelData);
      await jobRecord.save();
    }

    if (jobRecord.status == .complete) {
      return "already complete";
    }

    jobRecord.status = .processing;

    await jobRecord.save();

    final jobType = runtimeType;

    if (!_workerRegistry.containsKey(jobType)) {
      _workerRegistry[jobType] = await _spawnWorker();
      jobRecord.status = .queued;
      jobRecord.save();
    }
    final responsePort = ReceivePort();

    _workerRegistry[jobType]!.send({'job': this, 'replyTo': responsePort.sendPort});

    final result = await responsePort.first;
    responsePort.close();

    jobRecord.status = .complete;
    await jobRecord.save();

    return result;
  } catch (e, stack) {
    App().archeryLogger.error("Error in Queue Dispatch", {"origin": "mixin Queueable.dispatch", "error": e.toString(), "stack": stack.toString()});
    final jobRecord = await job;
    jobRecord?.status = .failed;
    await jobRecord?.save();
  }
}