process method

  1. @override
Future<void> process()
override

Starts processing queued jobs.

This method is typically used in a worker loop.

Example:

await queueDriver.process(); // start the worker

Implementation

@override
Future<void> process() async {
  JobContext? readyJob;

  await _lock.protect(() async {
    await _ensureLoaded();
    readyJob = _findNextReadyJob();
    if (readyJob != null) {
      // Mark as processing in memory to prevent other workers from grabbing
      readyJob!.status = JobStatus.processing;
      await _persistToFile();
    }
  });

  if (readyJob == null) return;

  try {
    await executeJob(readyJob!);
  } finally {
    await _lock.protect(() async {
      // Remove completed/failed job
      if (readyJob!.status == JobStatus.completed ||
          readyJob!.status == JobStatus.deadLettered) {
        _memoryCache.remove(readyJob);
        await _persistToFile();
      }

      // Update metrics
      if (metrics != null) {
        metrics!.recordQueueDepth(_memoryCache.length);
      }
    });
  }
}