DatabaseQueueDriver class

A PostgreSQL (or any SQL database) backed QueueDriver.

Uses SELECT ... FOR UPDATE SKIP LOCKED for atomic dequeue, preventing double-processing across multiple workers (distributed locking).

Required Table Schema

CREATE TABLE IF NOT EXISTS kronix_jobs (
  id TEXT PRIMARY KEY,
  name TEXT NOT NULL,
  queue TEXT NOT NULL DEFAULT 'default',
  status TEXT NOT NULL DEFAULT 'pending',
  attempts INTEGER NOT NULL DEFAULT 0,
  max_retries INTEGER NOT NULL DEFAULT 3,
  payload TEXT,
  last_error TEXT,
  created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
  available_at TIMESTAMPTZ,
  finished_at TIMESTAMPTZ,
  reserved_by TEXT
);

CREATE INDEX idx_kronix_jobs_pop ON kronix_jobs(queue, status, available_at);
Implemented types

Constructors

DatabaseQueueDriver({required DatabaseAdapter db, String table = 'kronix_jobs', String? workerId})
Creates a new DatabaseQueueDriver.

Properties

hashCode int
The hash code for this object.
no setterinherited
runtimeType Type
A representation of the runtime type of the object.
no setterinherited
workerId String
A unique worker identifier used for distributed locking.
final

Methods

clear([String queue = 'default']) Future<void>
Clears all jobs from the queue. Primarily for testing.
override
clearDeadLetters() Future<void>
Clears the dead letter queue.
override
complete(Job job) Future<void>
Marks a job as completed and removes it from the active set.
override
createTable() Future<void>
Creates the jobs table if it doesn't exist.
deadLetterJobs() Future<List<Map<String, dynamic>>>
Returns all permanently failed jobs (dead letter queue).
override
fail(Job job) Future<void>
Permanently removes a failed job and stores it in the dead letter queue.
override
fetchStats() Future<Map<String, dynamic>>
Returns aggregate metrics from the database.
noSuchMethod(Invocation invocation) → dynamic
Invoked when a nonexistent method or property is accessed.
inherited
pop([String queue = 'default']) Future<Job?>
Pops the next available job from the queue atomically.
override
push(Job job, [String queue = 'default']) Future<void>
Pushes a job onto the specified queue.
override
recoverStaleJobs({Duration timeout = const Duration(minutes: 5)}) Future<int>
Recovers stale jobs that were reserved but never completed (worker crashed).
registerJobFactory(String name, JobFactory factory) → void
Registers a factory to reconstruct a Job of the given name from a payload.
release(Job job) Future<void>
Returns a job to the queue for retry, respecting its availableAt.
override
retryDeadLetter(String jobId) Future<bool>
Retries a dead-letter job by its jobId, moving it back to the queue.
override
size([String queue = 'default']) Future<int>
Returns the number of pending jobs in the queue.
override
toString() String
A string representation of this object.
inherited

Operators

operator ==(Object other) bool
The equality operator.
inherited