DatabaseQueueDriver class
A PostgreSQL (or any SQL database) backed QueueDriver.
Uses SELECT ... FOR UPDATE SKIP LOCKED for atomic dequeue, preventing
double-processing across multiple workers (distributed locking).
Required Table Schema
CREATE TABLE IF NOT EXISTS kronix_jobs (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
queue TEXT NOT NULL DEFAULT 'default',
status TEXT NOT NULL DEFAULT 'pending',
attempts INTEGER NOT NULL DEFAULT 0,
max_retries INTEGER NOT NULL DEFAULT 3,
payload TEXT,
last_error TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
available_at TIMESTAMPTZ,
finished_at TIMESTAMPTZ,
reserved_by TEXT
);
CREATE INDEX idx_kronix_jobs_pop ON kronix_jobs(queue, status, available_at);
- Implemented types
Constructors
- DatabaseQueueDriver({required DatabaseAdapter db, String table = 'kronix_jobs', String? workerId})
- Creates a new DatabaseQueueDriver.
Properties
Methods
-
clear(
[String queue = 'default']) → Future< void> -
Clears all jobs from the
queue. Primarily for testing.override -
clearDeadLetters(
) → Future< void> -
Clears the dead letter queue.
override
-
complete(
Job job) → Future< void> -
Marks a
jobas completed and removes it from the active set.override -
createTable(
) → Future< void> - Creates the jobs table if it doesn't exist.
-
deadLetterJobs(
) → Future< List< Map< >String, dynamic> > -
Returns all permanently failed jobs (dead letter queue).
override
-
fail(
Job job) → Future< void> -
Permanently removes a failed
joband stores it in the dead letter queue.override -
fetchStats(
) → Future< Map< String, dynamic> > - Returns aggregate metrics from the database.
-
noSuchMethod(
Invocation invocation) → dynamic -
Invoked when a nonexistent method or property is accessed.
inherited
-
pop(
[String queue = 'default']) → Future< Job?> -
Pops the next available job from the
queueatomically.override -
push(
Job job, [String queue = 'default']) → Future< void> -
Pushes a
jobonto the specifiedqueue.override -
recoverStaleJobs(
{Duration timeout = const Duration(minutes: 5)}) → Future< int> - Recovers stale jobs that were reserved but never completed (worker crashed).
-
registerJobFactory(
String name, JobFactory factory) → void -
Registers a factory to reconstruct a Job of the given
namefrom a payload. -
release(
Job job) → Future< void> -
Returns a
jobto the queue for retry, respecting itsavailableAt.override -
retryDeadLetter(
String jobId) → Future< bool> -
Retries a dead-letter job by its
jobId, moving it back to the queue.override -
size(
[String queue = 'default']) → Future< int> -
Returns the number of pending jobs in the
queue.override -
toString(
) → String -
A string representation of this object.
inherited
Operators
-
operator ==(
Object other) → bool -
The equality operator.
inherited