kiss_queue 0.1.0 copy "kiss_queue: ^0.1.0" to clipboard
kiss_queue: ^0.1.0 copied to clipboard

A simple, backend-agnostic queue interface for Dart

kiss_queue #

A simple, backend-agnostic queue interface for Dart โ€” part of the KISS (Keep It Simple, Stupid) family of libraries.

๐ŸŽฏ Purpose #

kiss_queue provides a unified, async queue interface that works with any backend. Whether you're building with in-memory queues for development, cloud-scale message queuing for production, or implementing custom database-backed queues, this library gives you a consistent API with enterprise-grade features.

Just queues. No ceremony. No complexity.


โœจ Features #

  • ๐Ÿ”„ Backend Agnostic: Unified interface works with any queue implementation
  • โšก Production Ready: Visibility timeouts, dead letter queues, message expiration
  • ๐Ÿ”Œ Serialization Support: Pluggable serialization for any data format (JSON, Binary, etc.)
  • ๐Ÿงช Comprehensive Testing: Built-in test suite for validating any implementation
  • ๐Ÿ“Š Enterprise Ready: Dead letter queues, visibility timeouts, message expiration
  • ๐Ÿš€ High Performance: Optimized interface for maximum throughput
  • ๐Ÿ›ก๏ธ Reliable: SQS-like behavior with automatic message reprocessing
  • ๐ŸŽฏ Simple API: Minimal interface - enqueue, dequeue, acknowledge, reject
  • ๐Ÿ“ฆ Zero Dependencies: Pure Dart implementation (except uuid for message IDs)

๐Ÿš€ Quick Start #

Basic Usage #

import 'package:kiss_queue/kiss_queue.dart';

void main() async {
  // Create a queue factory
  final factory = InMemoryQueueFactory();
  
  // Create a queue
  final queue = await factory.createQueue<String, String>('my-queue');
  
  // Enqueue a message (two equivalent ways)
  await queue.enqueue(QueueMessage.create('Hello, World!'));
  await queue.enqueuePayload('Hello, simplified!'); // Shorthand
  
  // Dequeue and process
  final message = await queue.dequeue();
  if (message != null) {
    print('Received: ${message.payload}');
    await queue.acknowledge(message.id);
  }
  
  // Cleanup
  factory.disposeAll();
}

Serialization Example #

import 'dart:convert';
import 'package:kiss_queue/kiss_queue.dart';

class Order {
  final String orderId;
  final double amount;
  Order(this.orderId, this.amount);
  
  // Serialization methods
  Map<String, dynamic> toJson() => {'orderId': orderId, 'amount': amount};
  static Order fromJson(Map<String, dynamic> json) => 
      Order(json['orderId'], json['amount']);
}

// Custom JSON serializer
class OrderJsonSerializer implements MessageSerializer<Order, String> {
  @override
  String serialize(Order payload) => jsonEncode(payload.toJson());
  
  @override
  Order deserialize(String data) => Order.fromJson(jsonDecode(data));
}

void main() async {
  final factory = InMemoryQueueFactory();
  
  // Create queue with serialization
  final queue = await factory.createQueue<Order, String>(
    'order-queue',
    serializer: OrderJsonSerializer(),
  );
  
  // Both methods work with serialization
  final order = Order('ORD-123', 99.99);
  await queue.enqueuePayload(order);              // Serializes automatically
  await queue.enqueue(QueueMessage.create(order)); // Also serializes
  
  // Dequeue automatically deserializes
  final message = await queue.dequeue();
  if (message != null) {
    print('Order: ${message.payload.orderId}'); // Fully typed Order object
    await queue.acknowledge(message.id);
  }
  
  factory.disposeAll();
}

Advanced Usage with Error Handling #

import 'package:kiss_queue/kiss_queue.dart';

class Order {
  final String orderId;
  final double amount;
  
  Order(this.orderId, this.amount);
}

void main() async {
  final factory = InMemoryQueueFactory();
  
  // Create main queue with dead letter queue for failed messages
  final deadLetterQueue = await factory.createQueue<Order, Order>('failed-orders');
  final orderQueue = await factory.createQueue<Order, Order>(
    'orders',
    configuration: QueueConfiguration.highThroughput,
    deadLetterQueue: deadLetterQueue,
  );
  
  // Enqueue an order
  final order = Order('ORD-123', 99.99);
  await orderQueue.enqueuePayload(order); // Simple payload enqueue
  
  // Process with error handling
  final message = await orderQueue.dequeue();
  if (message != null) {
    try {
      await processOrder(message.payload);
      await orderQueue.acknowledge(message.id);
    } catch (e) {
      // Reject and requeue for retry (will move to DLQ after max attempts)
      await orderQueue.reject(message.id, requeue: true);
    }
  }
  
  // Queue operations completed successfully
  print('Order processing complete!');
  
  factory.disposeAll();
}

Future<void> processOrder(Order order) async {
  // Your order processing logic here
  print('Processing order ${order.orderId} for \$${order.amount}');
}

๐Ÿ—๏ธ Architecture #

Core Interface #

abstract class Queue<T, S> {
  // Queue configuration and dead letter queue  
  QueueConfiguration get configuration;
  Queue<T, S>? get deadLetterQueue;
  MessageSerializer<T, S>? get serializer;
  
  // Core operations
  Future<void> enqueue(QueueMessage<T> message);
  Future<void> enqueuePayload(T payload);           // Shorthand helper
  Future<QueueMessage<T>?> dequeue();
  Future<void> acknowledge(String messageId);
  Future<QueueMessage<T>?> reject(String messageId, {bool requeue = true});
  
  // Cleanup
  void dispose();
}

Serialization Interface #

abstract class MessageSerializer<T, S> {
  /// Serialize payload to storage format
  S serialize(T payload);
  
  /// Deserialize from storage format back to payload
  T deserialize(S data);
}

Message Lifecycle #

  1. Enqueue: Add message to queue (with optional serialization)
  2. Dequeue: Retrieve message (becomes invisible to other consumers, with optional deserialization)
  3. Process: Handle the message in your application
  4. Acknowledge: Mark message as successfully processed (removes from queue)
  5. Reject: Mark message as failed (can requeue for retry or move to DLQ)

Built-in Reliability Features #

  • Visibility Timeout: Messages become invisible after dequeue, automatically restored if not acknowledged
  • Dead Letter Queue: Failed messages move to DLQ after max retry attempts
  • Message Expiration: Optional TTL for automatic message cleanup
  • Receive Count Tracking: Monitor how many times a message has been processed
  • Serialization Support: Automatic serialization/deserialization with pluggable serializers

๐Ÿ”Œ Serialization #

Built-in Serialization Patterns #

The queue supports flexible serialization through the MessageSerializer<T, S> interface, where:

  • T is your payload type (e.g., Order, User)
  • S is the storage format (e.g., String, Map<String, dynamic>, List<int>)

Common Serialization Examples #

JSON String Serialization

class JsonStringSerializer<T> implements MessageSerializer<T, String> {
  final T Function(Map<String, dynamic>) fromJson;
  final Map<String, dynamic> Function(T) toJson;
  
  JsonStringSerializer({required this.fromJson, required this.toJson});
  
  @override
  String serialize(T payload) => jsonEncode(toJson(payload));
  
  @override
  T deserialize(String data) => fromJson(jsonDecode(data));
}

// Usage
final queue = await factory.createQueue<Order, String>(
  'orders',
  serializer: JsonStringSerializer<Order>(
    fromJson: Order.fromJson,
    toJson: (order) => order.toJson(),
  ),
);

Binary Serialization

class BinarySerializer<T> implements MessageSerializer<T, List<int>> {
  final JsonStringSerializer<T> _jsonSerializer;
  
  BinarySerializer(this._jsonSerializer);
  
  @override
  List<int> serialize(T payload) {
    final jsonString = _jsonSerializer.serialize(payload);
    return utf8.encode(jsonString);
  }
  
  @override
  T deserialize(List<int> data) {
    final jsonString = utf8.decode(data);
    return _jsonSerializer.deserialize(jsonString);
  }
}

No Serialization (Direct Storage)

// When T == S, no serializer is needed
final queue = await factory.createQueue<String, String>('simple-queue');
await queue.enqueuePayload('Direct string storage');

Serialization Error Handling #

try {
  await queue.enqueuePayload(complexObject);
} on SerializationError catch (e) {
  print('Failed to serialize: ${e.message}');
}

try {
  final message = await queue.dequeue();
} on DeserializationError catch (e) {
  print('Failed to deserialize: ${e.message}');
  print('Raw data: ${e.data}');
}

๐Ÿ“ฆ Implementations #

In-Memory Queue (Included) #

Perfect for development, testing, and single-instance applications:

final factory = InMemoryQueueFactory();
final queue = await factory.createQueue<MyData, String>(
  'my-queue',
  serializer: MySerializer(),
);

Custom Implementations #

The generic interface makes it easy to implement queues for any backend:

  • Cloud Providers: Cloud-scale message queuing with built-in serialization
  • Google Cloud Pub/Sub: Global message distribution
  • Redis: High-performance in-memory queuing
  • PostgreSQL/MySQL: Database-backed persistence with JSON/binary serialization
  • Apache Kafka: High-throughput event streaming
  • RabbitMQ: Feature-rich message broker

๐Ÿงช Testing Your Implementation #

kiss_queue includes a comprehensive test suite that can validate any implementation with just one line of code:

// test/my_implementation_test.dart
import 'package:kiss_queue/kiss_queue.dart';
import 'implementation_tester.dart';

void main() {
  final factory = MyCustomQueueFactory();
  final tester = ImplementationTester('MyCustomQueue', factory, () {
    factory.disposeAll(); // Your cleanup logic
  });
  
  tester.run(); // That's it! 87 comprehensive tests will run
}

Test Coverage: 87 tests covering functionality, serialization, performance, concurrency, and edge cases.

Comprehensive Test Coverage #

  • โœ… Core functionality (enqueue, enqueuePayload, dequeue, acknowledge, reject)
  • โœ… Serialization testing (JSON, binary, Map serializers + error handling)
  • โœ… Performance benchmarks (throughput, latency, concurrency)
  • โœ… Edge cases (timeouts, retries, dead letters, non-existent messages)
  • โœ… Factory management (create, get, delete queues)
  • โœ… Queue equivalence (both enqueue methods produce identical results)

Steps to Test Your Implementation #

  1. Implement your QueueFactory (with your custom Queue implementation)
  2. Create a test file with ImplementationTester
  3. Run: dart test my_implementation_test.dart

That's it! The ImplementationTester automatically runs both functional and performance tests with appropriate configurations.

โš™๏ธ Configuration #

Predefined Configurations #

// Fast, high-volume (development/testing)
QueueConfiguration.inMemory

// Balanced (cloud services)  
QueueConfiguration.cloud

// High throughput (production)
QueueConfiguration.highThroughput

// Quick testing
QueueConfiguration.testing

Custom Configuration #

const myConfig = QueueConfiguration(
  maxReceiveCount: 5,                              // Max retries before DLQ
  visibilityTimeout: Duration(minutes: 5),         // Processing timeout
  messageRetentionPeriod: Duration(hours: 24),     // Message TTL
);

๐Ÿ“Š Performance #

The kiss_queue interface is designed for high-performance implementations:

  • โœ… Thread-safe operations
  • โœ… Multiple consumer support
  • โœ… Async-first design for scalability
  • โœ… Minimal overhead interface
  • โœ… Optional serialization (no overhead when T == S)

๐Ÿ› ๏ธ Installation #

Add to your pubspec.yaml:

dependencies:
  kiss_queue: ^1.0.0

Then run:

dart pub get

๏ฟฝ๏ฟฝ API Reference #

QueueMessage #

// Auto-generated UUID ID and timestamp (most common)
QueueMessage.create(payload)

// With custom ID generation function
QueueMessage.create(payload, idGenerator: () => 'MSG-${DateTime.now().millisecondsSinceEpoch}')

// With optional parameters
QueueMessage(payload: data, id: customId, createdAt: timestamp)

// With explicit ID (useful for testing)
QueueMessage.withId(id: 'custom-123', payload: data)

Queue Operations #

// Enqueue with full QueueMessage control
await queue.enqueue(QueueMessage.create(payload));
await queue.enqueue(QueueMessage.withId(id: 'custom-id', payload: payload));

// Enqueue payload directly (uses queue's configured idGenerator)
await queue.enqueuePayload(payload);

// Both methods are equivalent and work with serialization

Custom ID Generation Examples

// Sequential counter
int messageCounter = 1000;
final queue = await factory.createQueue<Order, String>(
  'orders',
  idGenerator: () => 'MSG-${messageCounter++}',
  serializer: OrderSerializer(),
);

// Timestamp-based
QueueMessage.create(data, idGenerator: () => 'TS-${DateTime.now().millisecondsSinceEpoch}')

// Prefixed UUID
QueueMessage.create(data, idGenerator: () => 'ORDER-${Uuid().v4()}')

// Custom format
QueueMessage.create(data, idGenerator: () => '${userId}-${Random().nextInt(10000)}')

Error Handling #

try {
  await queue.acknowledge('non-existent-id');
} on MessageNotFoundError catch (e) {
  print('Message not found: ${e.messageId}');
}

try {
  await queue.enqueuePayload(complexObject);
} on SerializationError catch (e) {
  print('Serialization failed: ${e.message}');
  print('Cause: ${e.cause}');
}

try {
  final message = await queue.dequeue();
} on DeserializationError catch (e) {
  print('Deserialization failed: ${e.message}');
  print('Raw data: ${e.data}');
  print('Cause: ${e.cause}');
}

๐Ÿค Contributing #

We welcome contributions! Please see our contributing guidelines for details.

Running Tests #

# Run all tests (87 comprehensive tests)
dart test

# Run specific implementation tests
dart test test/in_memory_test.dart

# Run serialization tests
dart test test/serialization_test.dart

# Run performance benchmarks
dart test test/performance_test.dart

๐Ÿ“„ License #

This project is licensed under the MIT License - see the LICENSE file for details.

๐ŸŒŸ Why kiss_queue? #

  • Simple: Minimal API surface, easy to understand
  • Reliable: Battle-tested patterns from cloud message queuing services
  • Flexible: Works with any backend via clean interface, supports any serialization format
  • Performant: Optimized for high throughput and low latency
  • Testable: Comprehensive test suite with 87 tests included
  • Production Ready: Used in production applications with full serialization support

Perfect for microservices, event-driven architectures, background job processing, and any application that needs reliable async message processing with flexible data serialization.


Built with โค๏ธ by the WAMF team. Part of the KISS family of simple, focused Dart packages.

0
likes
0
points
16
downloads

Publisher

unverified uploader

Weekly Downloads

A simple, backend-agnostic queue interface for Dart

Repository (GitHub)
View/report issues

License

unknown (license)

Dependencies

collection, uuid

More

Packages that depend on kiss_queue