duraq 1.0.0 copy "duraq: ^1.0.0" to clipboard
duraq: ^1.0.0 copied to clipboard

A durable queuing system implemented in Dart

DuraQ #

A robust, durable queuing system implemented in Dart, providing persistent queue management with multiple storage backend support.

Overview #

DuraQ is designed to provide a reliable queuing system with:

  • Type-safe queue operations
  • Persistent storage options
  • Multiple queue support
  • Entry tracking and monitoring
  • Extensible storage backends

Quick Start #

  1. Add to your pubspec.yaml:
dependencies:
  duraq: ^0.0.1
  1. Create a queue manager with SQLite storage:
import 'package:duraq/duraq.dart';
import 'package:path/path.dart' as path;

void main() async {
  // Initialize storage
  final dbPath = path.join(Directory.current.path, 'queue.db');
  final storage = SQLiteStorage(dbPath: dbPath);

  // Create queue manager
  final manager = QueueManager(storage);

  // Get a typed queue
  final emailQueue = manager.queue<String>('emails');

  try {
    // Enqueue items
    await emailQueue.enqueue('Welcome email to user@example.com');

    // Process items
    final email = await emailQueue.dequeue();
    if (email != null) {
      await processEmail(email);
    }

    // Check queue status
    final remaining = await emailQueue.length;
    print('Remaining emails: $remaining');
  } finally {
    // Clean up resources
    storage.dispose();
  }
}

Features #

  • Type-safe Queues: Generic support for any data type
  • Multiple Queues: Manage different queue types in one manager
  • Persistent Storage: Built-in SQLite support with ACID compliance
  • Entry Tracking: Monitor attempts, creation time, and status
  • Transaction Support: Atomic operations with nested transaction support
  • Retry Policies: Configurable retry strategies with exponential backoff
  • Priority Support: Priority-based queue processing (lower value = higher priority)
  • TTL Support: Automatic entry expiration
  • Scheduled Execution: Delay processing until a specific time
  • Dead Letter Queue: Automatic handling of permanently failed entries
  • Health Checks: Monitor storage, metrics, and queue health
  • Concurrent Processing: Entry-level locking for safe multi-consumer access
  • Extensible: Custom storage backend support via StorageInterface

Storage Backends #

SQLite Storage (Built-in) #

final storage = SQLiteStorage(dbPath: 'path/to/queue.db');

Features:

  • Persistent across restarts
  • ACID compliant
  • Automatic schema management
  • FIFO guarantee
  • Data integrity
  • Transaction support with savepoints
  • Priority-based retrieval
  • TTL support

Isar Storage #

// Initialize Isar with required schemas
await Isar.initializeIsarCore(download: true);
final isar = await Isar.open([
  ...IsarStorage.requiredSchemas,
  // Add your other schemas here if needed
], directory: 'path/to/db');

// Create storage with external Isar instance
final storage = IsarStorage(isar);

// Use the storage
final manager = QueueManager(storage);

// Clean up (caller manages Isar lifecycle)
await storage.dispose(); // Releases locks but doesn't close Isar
await isar.close(); // Caller closes Isar

Features:

  • High-performance NoSQL database
  • Persistent across restarts
  • Native indexing for better performance
  • Type-safe queries
  • Cross-platform support
  • Memory efficient with lazy loading
  • Shared Isar instance support - allows multiple components to use the same database
  • External lifecycle management - caller controls when to open/close the database

Note: Isar's transaction support in DuraQ is compatibility-tracked (depth counter) rather than true nested transactions. Individual Isar write operations are atomic via writeTxn, but the beginTransaction/commitTransaction API does not provide the same ACID guarantees as SQLite's savepoint-based transactions.

Upcoming Storage Options #

  • File system storage
  • Custom storage implementations via StorageInterface

Best Practices #

  1. Resource Management
final storage = SQLiteStorage(dbPath: 'queue.db');
try {
  // Use storage
} finally {
  storage.dispose(); // Always dispose
}
  1. Type Safety
// Prefer specific types
final emailQueue = manager.queue<EmailMessage>('emails');
final jobQueue = manager.queue<BackgroundJob>('jobs');
  1. Error Handling
try {
  await queue.enqueue(item);
} catch (e) {
  // Handle storage errors
}
  1. Transaction Usage
// Simple transaction
await storage.transaction(() async {
  await queue.enqueue(item1);
  await queue.enqueue(item2);
  return null;
}); // Automatically commits or rolls back

// Nested transactions (SQLite uses savepoints)
await storage.transaction(() async {
  await queue.enqueue(item1);

  await storage.transaction(() async {
    await queue.enqueue(item2);
    return null;
  }); // Inner transaction

  return null;
}); // Outer transaction

// Manual transaction control
await storage.beginTransaction();
try {
  await queue.enqueue(item1);
  await queue.enqueue(item2);
  await storage.commitTransaction();
} catch (e) {
  await storage.rollbackTransaction();
  rethrow;
}
  1. Retry Policies
// Create a queue with exponential backoff retry
final queue = Queue<String>(
  'my-queue',
  storage,
  retryPolicy: ExponentialBackoff(
    baseDelay: Duration(milliseconds: 100),
    maxDelay: Duration(seconds: 30),
    maxAttempts: 5,
  ),
);

// Process items with automatic retry
try {
  await queue.processNext((data) async {
    await processItem(data);
    // If this throws, the entry is retried according to policy
    // After maxAttempts, the entry moves to the dead letter queue
  });
} catch (e) {
  print('Processing failed: $e');
}

Examples #

Basic Queue Operations #

// Create a queue
final queue = manager.queue<String>('notifications');

// Add items
await queue.enqueue('Notification 1');
await queue.enqueue('Notification 2');

// dequeue() returns the raw data (T?), not a QueueEntry
while (true) {
  final item = await queue.dequeue();
  if (item == null) break;

  await processNotification(item);
}

Multiple Queue Types #

// Email queue
final emailQueue = manager.queue<EmailMessage>('emails');
await emailQueue.enqueue(EmailMessage(...));

// Job queue
final jobQueue = manager.queue<BackgroundJob>('jobs');
await jobQueue.enqueue(BackgroundJob(...));

Transaction Support #

DuraQ provides robust transaction support (ACID guarantees with SQLite):

  1. Atomicity: All operations in a transaction either succeed or fail together
  2. Consistency: The database remains in a valid state before and after the transaction
  3. Isolation: Concurrent transactions don't interfere with each other
  4. Durability: Once committed, changes persist even after system failures

Transaction Methods

  • transaction<T>(Future<T> Function() operations): Executes operations in a transaction
  • beginTransaction(): Starts a manual transaction
  • commitTransaction(): Commits a manual transaction
  • rollbackTransaction(): Rolls back a manual transaction

Features

  • Automatic commit/rollback handling
  • Nested transaction support using savepoints (SQLite)
  • Manual transaction control when needed
  • Error handling with automatic rollback

Example Use Cases

  1. Atomic Multi-Queue Operations
await storage.transaction(() async {
  await emailQueue.enqueue(welcomeEmail);
  await notificationQueue.enqueue(welcomeNotification);
  await analyticsQueue.enqueue(userSignupEvent);
  return null;
});
  1. Batch Processing with Rollback
await storage.transaction(() async {
  for (final job in jobs) {
    if (!isValid(job)) {
      throw ValidationError(); // Rolls back all enqueued jobs
    }
    await jobQueue.enqueue(job);
  }
  return null;
});
  1. Complex Queue Operations
await storage.transaction(() async {
  // Process high-priority queue
  final highPriorityItem = await highPriorityQueue.dequeue();
  if (highPriorityItem != null) {
    await processQueue.enqueue(highPriorityItem);
  }

  // Process normal queue
  final normalItem = await normalQueue.dequeue();
  if (normalItem != null) {
    await processQueue.enqueue(normalItem);
  }

  return null;
});

Retry Policies #

DuraQ includes built-in retry support with configurable policies:

  1. Exponential Backoff
// Configure retry policy
final retryPolicy = ExponentialBackoff(
  baseDelay: Duration(milliseconds: 100), // Initial delay
  maxDelay: Duration(seconds: 30),        // Maximum delay
  maxAttempts: 5,                         // Maximum retry attempts
);

// Create queue with retry policy
final queue = Queue<String>('retry-queue', storage, retryPolicy: retryPolicy);

// processNext handles retry logic automatically
await queue.processNext((data) async {
  await processWithRetry(data);
});
  1. Custom Retry Policies
class CustomRetryPolicy implements RetryPolicy {
  @override
  int get maxAttempts => 3;

  @override
  bool shouldRetry(int attempts, [Object? error]) {
    // Custom retry logic
    return attempts < maxAttempts && error is RetryableError;
  }

  @override
  Duration getRetryDelay(int attempts) {
    // Custom delay calculation
    return Duration(seconds: attempts * 5);
  }

  @override
  bool shouldMoveToDeadLetter(int attempts, [Object? error]) {
    // Custom dead letter queue logic
    return attempts >= maxAttempts || error is NonRetryableError;
  }
}
  1. Error Handling with Retries
try {
  await queue.processNext((data) async {
    if (!await processItem(data)) {
      throw RetryableError('Processing failed, will retry');
    }
  });
} catch (e) {
  // Failed attempts are automatically retried
  // After maxAttempts, moves to dead letter queue
  print('Processing failed: $e');
}

Dead Letter Queue #

DuraQ provides built-in dead letter queue support for handling failed entries:

  1. Automatic Failed Entry Movement
// Create a queue with retry policy
final queue = Queue<String>(
  'my-queue',
  storage,
  retryPolicy: ExponentialBackoff(maxAttempts: 3),
);

// Failed entries move to dead letter queue after max attempts
try {
  await queue.processNext((data) async {
    if (!await processItem(data)) {
      throw ProcessingError('Failed to process item');
    }
  });
} catch (e) {
  // Entry has been moved to dead letter or scheduled for retry
}
  1. Dead Letter Queue Management
// Create a dead letter queue for a specific queue
final dlq = DeadLetterQueue<String>('my-queue', storage);

// List dead letter entries with pagination
final entries = await dlq.list(limit: 10, offset: 0);
for (final entry in entries) {
  print('Failed entry: ${entry.data}');
  print('Error: ${entry.errorMessage}');
  print('Attempts: ${entry.attempts}');
}

// Get dead letter queue length
final count = await dlq.length;
print('Dead letter entries: $count');
  1. Entry Recovery
// Retry a specific entry (resets attempts to 0, moves back to pending)
final deadLetter = await dlq.retrieve();
if (deadLetter != null) {
  await dlq.retry(deadLetter.id);
}

// Remove an entry permanently
await dlq.remove(deadLetter.id);
  1. Cleanup and Maintenance
// Purge old entries
final purged = await dlq.purgeOldEntries(Duration(days: 7));
print('Purged $purged old entries');

Features

  • Automatic movement of failed entries after retry exhaustion
  • Pagination support for listing entries
  • Entry retry capability (resets attempt counter)
  • Permanent entry removal
  • Automatic cleanup of old entries
  • Error message preservation
  • Attempt count tracking

Best Practices

  1. Regular Monitoring
// Check dead letter queue periodically
final dlq = DeadLetterQueue<String>('my-queue', storage);
final count = await dlq.length;
if (count > 0) {
  // Alert operations team
  notifyOps('Dead letter queue has $count entries');
}
  1. Cleanup Strategy
// Implement regular cleanup
final cleanupJob = Timer.periodic(Duration(days: 1), (_) async {
  final dlq = DeadLetterQueue<String>('my-queue', storage);

  // Keep entries for 30 days
  final purged = await dlq.purgeOldEntries(Duration(days: 30));

  // Log cleanup results
  logger.info('Purged $purged old dead letter entries');
});
  1. Error Analysis
// Analyze failed entries
final entries = await dlq.list();
final errorCounts = <String, int>{};

for (final entry in entries) {
  errorCounts[entry.errorMessage ?? 'Unknown'] =
    (errorCounts[entry.errorMessage] ?? 0) + 1;
}

// Report error distribution
for (final error in errorCounts.entries) {
  print('${error.key}: ${error.value} occurrences');
}
  1. Retry Strategy
// Implement selective retry
final entries = await dlq.list();
for (final entry in entries) {
  if (entry.errorMessage?.contains('Temporary failure') ?? false) {
    // Retry temporary failures (resets attempts to 0)
    await dlq.retry(entry.id);
  } else {
    // Remove permanently failed entries
    await dlq.remove(entry.id);
  }
}

Scheduled Execution #

DuraQ supports scheduling entries for future processing:

// Schedule an entry for future processing using enqueueEntry
final scheduledTime = DateTime.now().add(Duration(hours: 1));
final entry = QueueEntry<String>(
  id: 'scheduled-task-1',
  data: 'Process me later',
  createdAt: DateTime.now(),
  scheduledFor: scheduledTime,
);

await queue.enqueueEntry(entry);

// The entry won't be retrieved until the scheduled time
final nextItem = await queue.dequeue(); // Returns null if no entries are ready

Features

  • Schedule entries for future processing
  • Entries remain in queue but won't be retrieved before their scheduled time
  • Combines with priority support (priority order applies once scheduled time is reached)
  • Perfect for:
    • Delayed email notifications
    • Scheduled background jobs
    • Time-based workflow triggers
    • Deferred processing

Example Use Cases

  1. Delayed Notifications
final reminderTime = DateTime.now().add(Duration(days: 1));
await notificationQueue.enqueueEntry(QueueEntry(
  id: 'reminder-1',
  data: 'Follow-up reminder email',
  createdAt: DateTime.now(),
  scheduledFor: reminderTime,
));
  1. Scheduled Tasks
final midnightTonight = DateTime.now().copyWith(
  hour: 0,
  minute: 0,
  second: 0,
  millisecond: 0,
).add(Duration(days: 1));

await jobQueue.enqueueEntry(QueueEntry(
  id: 'nightly-job-1',
  data: 'Run nightly maintenance',
  createdAt: DateTime.now(),
  scheduledFor: midnightTonight,
));
  1. Time-based Workflow
final now = DateTime.now();
final workflow = [
  QueueEntry(
    id: 'step1',
    data: 'Send welcome email',
    createdAt: now,
    scheduledFor: now,
  ),
  QueueEntry(
    id: 'step2',
    data: 'Send follow-up survey',
    createdAt: now,
    scheduledFor: now.add(Duration(days: 7)),
  ),
  QueueEntry(
    id: 'step3',
    data: 'Send engagement reminder',
    createdAt: now,
    scheduledFor: now.add(Duration(days: 14)),
  ),
];

await storage.transaction(() async {
  for (final step in workflow) {
    await workflowQueue.enqueueEntry(step);
  }
  return null;
});

Concurrent Processing #

DuraQ provides built-in support for concurrent processing with entry-level locking:

// Multiple consumers can safely process queue entries
final consumer1 = QueueManager(storage);
final consumer2 = QueueManager(storage);

// Each consumer gets a different entry - entries are locked while processing
final item1 = await consumer1.queue<String>('jobs').dequeue();
final item2 = await consumer2.queue<String>('jobs').dequeue();

Features

  • Entry-level locking for safe concurrent processing
  • Automatic lock cleanup for expired locks (default: 5 minute timeout)
  • Combines with other features:
    • Priority-based processing
    • Scheduled execution
    • Transaction support
    • Retry policies

Example Use Cases

  1. Multiple Workers
void startWorker(String name, SQLiteStorage storage) async {
  final queue = Queue<String>('jobs', storage);

  while (true) {
    try {
      final processed = await queue.processNext((data) async {
        await processJob(data);
      });
      if (!processed) {
        await Future.delayed(Duration(seconds: 1));
      }
    } catch (e) {
      // processNext handles retry/dead-letter automatically
      print('$name: Processing failed: $e');
    }
  }
}

// Start multiple workers sharing the same storage
final storage = SQLiteStorage(dbPath: 'queue.db');
startWorker('worker1', storage);
startWorker('worker2', storage);
startWorker('worker3', storage);
  1. Safe Concurrent Processing
final queue = Queue<String>('orders', storage);

// Multiple processors can safely run in parallel
await Future.wait([
  processOrders(queue),
  processOrders(queue),
  processOrders(queue),
]);

Future<void> processOrders(Queue<String> queue) async {
  while (true) {
    final processed = await queue.processNext((data) async {
      await fulfillOrder(data);
    });
    if (!processed) break; // No more items
  }
}
  1. Health Monitoring
// Periodically check for stuck entries using SQLiteStorage directly
final sqliteStorage = storage as SQLiteStorage;
final stuckEntries = await sqliteStorage.getEntriesByStatus(
  'jobs',
  EntryStatus.processing,
);

for (final entry in stuckEntries) {
  final processingTime = DateTime.now().difference(entry.lastUpdatedAt);
  if (processingTime > Duration(hours: 1)) {
    // Alert operations about stuck entry
    notifyOps('Entry ${entry.id} stuck in processing');
  }
}

Note: getEntriesByStatus() is available on SQLiteStorage and IsarStorage directly, but is not part of the StorageInterface contract.

Health Checks #

DuraQ provides comprehensive health monitoring capabilities to ensure your queue system is operating correctly. The health check system monitors three main components:

  1. Storage Health: Verifies that the storage backend is responsive and functioning properly.
  2. Metrics Health: Ensures the metrics collection system is operational.
  3. Queue Health: Monitors queue performance metrics like error rates and processing times.

Usage #

// Create health checks
final healthChecks = HealthCheckAggregator([
  StorageHealthCheck(storage),
  MetricsHealthCheck(metrics),
  QueueHealthCheck(
    storage,
    metrics,
    errorRateWindow: Duration(minutes: 5),
    maxErrorRate: 0.1, // 10% threshold
  ),
]);

// Check overall health
final status = await healthChecks.getOverallStatus();
if (status == HealthStatus.healthy) {
  print('All systems operational');
}

// Get detailed health information (checks run in parallel)
final results = await healthChecks.checkAll();
for (final result in results.values) {
  print('${result.component}: ${result.status}');
  print('Message: ${result.message}');
  print('Details: ${result.details}');
}

Health Status Levels #

  • Healthy: All components are functioning normally
  • Degraded: System is operational but showing signs of stress (e.g., high error rate)
  • Unhealthy: One or more components have failed

Monitored Metrics #

  • Error rates
  • Queue size
  • Average processing time
  • Storage responsiveness
  • Metrics system status

Contributing #

Contributions are welcome! Please read our contributing guidelines and submit pull requests to our repository.

License #

MIT

See Also #

1
likes
110
points
90
downloads

Documentation

API reference

Publisher

verified publisherwerkswinkel.com

Weekly Downloads

A durable queuing system implemented in Dart

Repository (GitHub)
View/report issues

License

MIT (license)

Dependencies

isar, path, sqlite3, uuid

More

Packages that depend on duraq