dispatch method
Dispatches this job for execution through a worker isolate.
Behavior:
- resolves or creates a QueueJob record
- skips execution if the job is already complete
- marks the job as processing/queued
- spawns a worker isolate for the current runtime type if needed
- sends the job to the worker and awaits the reply
- marks the queue record complete on success
On failure, the queue record is marked as failed when possible.
Example:
final result = await SendWelcomeEmail('jane@example.com').dispatch();
print(result);
Implementation
Future<dynamic> dispatch() async {
try {
QueueJob? jobRecord = await job;
if (jobRecord == null) {
final payloadField = {'_name': runtimeType.toString(), ...toJson()};
final modelData = {'status': 'created', 'payload': jsonEncode(payloadField)};
jobRecord = QueueJob.fromJson(modelData);
await jobRecord.save();
}
if (jobRecord.status == .complete) {
return "already complete";
}
jobRecord.status = .processing;
await jobRecord.save();
final jobType = runtimeType;
if (!_workerRegistry.containsKey(jobType)) {
_workerRegistry[jobType] = await _spawnWorker();
jobRecord.status = .queued;
jobRecord.save();
}
final responsePort = ReceivePort();
_workerRegistry[jobType]!.send({'job': this, 'replyTo': responsePort.sendPort});
final result = await responsePort.first;
responsePort.close();
jobRecord.status = .complete;
await jobRecord.save();
return result;
} catch (e, stack) {
App().archeryLogger.error("Error in Queue Dispatch", {"origin": "mixin Queueable.dispatch", "error": e.toString(), "stack": stack.toString()});
final jobRecord = await job;
jobRecord?.status = .failed;
await jobRecord?.save();
}
}