kiss_queue 0.0.1 copy "kiss_queue: ^0.0.1" to clipboard
kiss_queue: ^0.0.1 copied to clipboard

A simple, backend-agnostic queue interface for Dart

kiss_queue #

A simple, backend-agnostic queue interface for Dart โ€” part of the KISS (Keep It Simple, Stupid) family of libraries.

๐ŸŽฏ Purpose #

kiss_queue provides a unified, async queue interface that works with any backend. Whether you're building with in-memory queues for development, cloud-scale message queuing for production, or implementing custom database-backed queues, this library gives you a consistent API with enterprise-grade features.

Just queues. No ceremony. No complexity.


โœจ Features #

  • ๐Ÿ”„ Backend Agnostic: Unified interface works with any queue implementation
  • โšก Production Ready: Visibility timeouts, dead letter queues, message expiration
  • ๐Ÿงช Comprehensive Testing: Built-in test suite for validating any implementation
  • ๐Ÿ“Š Enterprise Ready: Dead letter queues, visibility timeouts, message expiration
  • ๐Ÿš€ High Performance: Optimized interface for maximum throughput
  • ๐Ÿ›ก๏ธ Reliable: SQS-like behavior with automatic message reprocessing
  • ๐ŸŽฏ Simple API: Minimal interface - enqueue, dequeue, acknowledge, reject
  • ๐Ÿ“ฆ Zero Dependencies: Pure Dart implementation (except uuid for message IDs)

๐Ÿš€ Quick Start #

Basic Usage #

import 'package:kiss_queue/kiss_queue.dart';

void main() async {
  // Create a queue factory
  final factory = InMemoryQueueFactory();
  
  // Create a queue
  final queue = await factory.createQueue<String>('my-queue');
  
  // Enqueue a message
  await queue.enqueue(QueueMessage.create('Hello, World!'));
  
  // Dequeue and process
  final message = await queue.dequeue();
  if (message != null) {
    print('Received: ${message.payload}');
    await queue.acknowledge(message.id);
  }
  
  // Cleanup
  factory.disposeAll();
}

Advanced Usage with Error Handling #

import 'package:kiss_queue/kiss_queue.dart';

class Order {
  final String orderId;
  final double amount;
  
  Order(this.orderId, this.amount);
}

void main() async {
  final factory = InMemoryQueueFactory();
  
  // Create main queue with dead letter queue for failed messages
  final deadLetterQueue = await factory.createQueue<Order>('failed-orders');
  final orderQueue = await factory.createQueue<Order>(
    'orders',
    configuration: QueueConfiguration.highThroughput,
    deadLetterQueue: deadLetterQueue,
  );
  
  // Enqueue an order
  final order = Order('ORD-123', 99.99);
  await orderQueue.enqueue(QueueMessage.create(order));
  
  // Process with error handling
  final message = await orderQueue.dequeue();
  if (message != null) {
    try {
      await processOrder(message.payload);
      await orderQueue.acknowledge(message.id);
    } catch (e) {
      // Reject and requeue for retry (will move to DLQ after max attempts)
      await orderQueue.reject(message.id, requeue: true);
    }
  }
  
  // Queue operations completed successfully
  print('Order processing complete!');
  
  factory.disposeAll();
}

Future<void> processOrder(Order order) async {
  // Your order processing logic here
  print('Processing order ${order.orderId} for \$${order.amount}');
}

๐Ÿ—๏ธ Architecture #

Core Interface #

abstract class Queue<T> {
  // Queue configuration and dead letter queue  
  QueueConfiguration get configuration;
  Queue<T>? get deadLetterQueue;
  
  // Core operations
  Future<void> enqueue(QueueMessage<T> message);
  Future<QueueMessage<T>?> dequeue();
  Future<void> acknowledge(String messageId);
  Future<QueueMessage<T>?> reject(String messageId, {bool requeue = true});
  
  // Cleanup
  void dispose();
}

Message Lifecycle #

  1. Enqueue: Add message to queue
  2. Dequeue: Retrieve message (becomes invisible to other consumers)
  3. Process: Handle the message in your application
  4. Acknowledge: Mark message as successfully processed (removes from queue)
  5. Reject: Mark message as failed (can requeue for retry or move to DLQ)

Built-in Reliability Features #

  • Visibility Timeout: Messages become invisible after dequeue, automatically restored if not acknowledged
  • Dead Letter Queue: Failed messages move to DLQ after max retry attempts
  • Message Expiration: Optional TTL for automatic message cleanup
  • Receive Count Tracking: Monitor how many times a message has been processed

๐Ÿ“ฆ Implementations #

In-Memory Queue (Included) #

Perfect for development, testing, and single-instance applications:

final factory = InMemoryQueueFactory();
final queue = await factory.createQueue<MyData>('my-queue');

Custom Implementations #

The generic interface makes it easy to implement queues for any backend:

  • Cloud Providers: Cloud-scale message queuing
  • Google Cloud Pub/Sub: Global message distribution
  • Redis: High-performance in-memory queuing
  • PostgreSQL/MySQL: Database-backed persistence
  • Apache Kafka: High-throughput event streaming
  • RabbitMQ: Feature-rich message broker

๐Ÿงช Testing Your Implementation #

kiss_queue includes a comprehensive test suite that can validate any implementation with just one line of code:

// test/my_implementation_test.dart
import 'package:kiss_queue/kiss_queue.dart';
import 'implementation_tester.dart';

void main() {
  final factory = MyCustomQueueFactory();
  final tester = ImplementationTester('MyCustomQueue', factory, () {
    factory.disposeAll(); // Your cleanup logic
  });
  
  tester.run(); // That's it! 25+ comprehensive tests will run
}

Test Coverage: 25+ tests covering functionality, performance, concurrency, and edge cases.

Steps to Test Your Implementation #

  1. Implement your QueueFactory (with your custom Queue implementation)
  2. Create a test file with ImplementationTester
  3. Run: dart test my_implementation_test.dart

That's it! The ImplementationTester automatically runs both functional and performance tests with appropriate configurations.

โš™๏ธ Configuration #

Predefined Configurations #

// Fast, high-volume (development/testing)
QueueConfiguration.inMemory

// Balanced (cloud services)  
QueueConfiguration.cloud

// High throughput (production)
QueueConfiguration.highThroughput

// Quick testing
QueueConfiguration.testing

Custom Configuration #

const myConfig = QueueConfiguration(
  maxReceiveCount: 5,                              // Max retries before DLQ
  visibilityTimeout: Duration(minutes: 5),         // Processing timeout
  messageRetentionPeriod: Duration(hours: 24),     // Message TTL
);

๐Ÿ“Š Performance #

The kiss_queue interface is designed for high-performance implementations:

  • โœ… Thread-safe operations
  • โœ… Multiple consumer support
  • โœ… Async-first design for scalability
  • โœ… Minimal overhead interface

๐Ÿ› ๏ธ Installation #

Add to your pubspec.yaml:

dependencies:
  kiss_queue: ^1.0.0

Then run:

dart pub get

๐Ÿ“š API Reference #

QueueMessage #

// Auto-generated UUID ID and timestamp (most common)
QueueMessage.create(payload)

// With custom ID generation function
QueueMessage.create(payload, idGenerator: () => 'MSG-${DateTime.now().millisecondsSinceEpoch}')

// With optional parameters
QueueMessage(payload: data, id: customId, createdAt: timestamp)

// With explicit ID (useful for testing)
QueueMessage.withId(id: 'custom-123', payload: data)

Custom ID Generation Examples

// Sequential counter
int messageCounter = 1000;
QueueMessage.create(data, idGenerator: () => 'MSG-${messageCounter++}')

// Timestamp-based
QueueMessage.create(data, idGenerator: () => 'TS-${DateTime.now().millisecondsSinceEpoch}')

// Prefixed UUID
QueueMessage.create(data, idGenerator: () => 'ORDER-${Uuid().v4()}')

// Custom format
QueueMessage.create(data, idGenerator: () => '${userId}-${Random().nextInt(10000)}')

Error Handling #

try {
  await queue.acknowledge('non-existent-id');
} on MessageNotFoundError catch (e) {
  print('Message not found: ${e.messageId}');
}

๐Ÿค Contributing #

We welcome contributions! Please see our contributing guidelines for details.

Running Tests #

# Run all tests
dart test

# Run specific implementation tests
dart test test/in_memory_test.dart

# Run performance benchmarks
dart test test/performance_test.dart

๐Ÿ“„ License #

This project is licensed under the MIT License - see the LICENSE file for details.

๐ŸŒŸ Why kiss_queue? #

  • Simple: Minimal API surface, easy to understand
  • Reliable: Battle-tested patterns from cloud message queuing services
  • Flexible: Works with any backend via clean interface
  • Performant: Optimized for high throughput and low latency
  • Testable: Comprehensive test suite included
  • Production Ready: Used in production applications

Perfect for microservices, event-driven architectures, background job processing, and any application that needs reliable async message processing.


Built with โค๏ธ by the WAMF team. Part of the KISS family of simple, focused Dart packages.

0
likes
0
points
16
downloads

Publisher

unverified uploader

Weekly Downloads

A simple, backend-agnostic queue interface for Dart

Repository (GitHub)
View/report issues

License

unknown (license)

Dependencies

collection, uuid

More

Packages that depend on kiss_queue