Loom

A structured worker pool framework for Dart providing concurrent task execution with priorities, retries, cancellation, and lifecycle management.

Features

  • Priority-based scheduling - Jobs are executed by priority, then by age
  • Configurable retry policies - Fixed, linear, or exponential backoff
  • Cancellation tokens - Cancel jobs before or during execution
  • Lifecycle hooks - Monitor job start, success, failure, and retries
  • Metrics collection - Track throughput, failure rates, and durations
  • Multiple execution modes - Main isolate, background isolates, or test mode
  • Type-safe API - Strongly typed tasks, inputs, and outputs

Quick Start

import 'package:loom/loom.dart';

// Define a task with explicit input/output types
final parseTask = Task<String, int>.simple(
  name: 'parseNumber',
  executor: (input, ctx) async => int.parse(input),
);

// Create a worker pool
final pool = WorkerPool.io('myPool');

// Submit work and get a handle
final handle = pool.submit(parseTask, '42');

// Wait for result (never throws - failures are wrapped)
final result = await handle.result;
print(result.valueOrThrow); // 42

// Clean up
await pool.shutdown();

Creating Pools

// I/O-optimized pool (for network, file operations)
final ioPool = WorkerPool.io('network');

// CPU-optimized pool (for compute, uses isolates)
final cpuPool = WorkerPool.cpu('compute');

// UI pool (limited workers, avoids congestion)
final uiPool = WorkerPool.ui('ui');

// Custom configuration
final pool = WorkerPool.fromBuilder(
  WorkerPoolBuilder('custom')
    .withWorkers(8)
    .withExecutionMode(ExecutionMode.isolate)
    .withMaxQueueSize(500)
    .withOverflowStrategy(OverflowStrategy.dropOldest)
    .withRetryPolicy(RetryPolicy.exponentialBackoff(
      maxAttempts: 3,
      initialDelay: Duration(seconds: 1),
    )),
);

Task Definition

Tasks are explicit objects with named input/output types:

// Simple task (works on main isolate)
final task = Task<String, int>.simple(
  name: 'parse',
  executor: (input, ctx) async => int.parse(input),
);

// Main-only task (cannot run on isolates)
final uiTask = Task<void, Widget>.mainOnly(
  name: 'buildWidget',
  executor: (_, ctx) async => Container(),
);

// Isolate-compatible task
final cpuTask = Task<List<int>, int>.simple(
  name: 'sum',
  executor: (numbers, ctx) async => numbers.reduce((a, b) => a + b),
  isolateCompatible: true,
);

Progress Reporting

final task = Task<int, void>.simple(
  name: 'download',
  executor: (count, ctx) async {
    for (var i = 0; i < count; i++) {
      // Report progress
      ctx.reportProgress(i / count);
      
      // Check cancellation
      ctx.throwIfCancelled();
      
      await fetchPage(i);
    }
  },
);

final handle = pool.submit(task, 100);
handle.progress.listen((p) => print('Progress: ${p * 100}%'));

Retry Policies

// No retries (default)
RetryPolicy.none()

// Fixed delay between retries
RetryPolicy.fixed(maxAttempts: 3, delay: Duration(seconds: 1))

// Exponential backoff
RetryPolicy.exponentialBackoff(
  maxAttempts: 5,
  initialDelay: Duration(milliseconds: 100),
  maxDelay: Duration(seconds: 30),
  multiplier: 2.0,
)

// Override per-job
pool.submit(task, input, retryPolicy: RetryPolicy.fixed(
  maxAttempts: 3,
  delay: Duration(seconds: 5),
));

Cancellation

// Create a cancellation token
final tokenSource = CancellationTokenSource();

// Submit with token
final handle = pool.submit(task, input, cancellationToken: tokenSource.token);

// Cancel from outside
tokenSource.cancel();

// Or use the handle
handle.cancel();

// Inside task, check cancellation
ctx.throwIfCancelled();

Lifecycle Hooks

final pool = WorkerPool.fromBuilder(
  WorkerPoolBuilder('monitored')
    .withHooks(PoolHooks(
      onJobStart: (id, name) => log('Starting $name'),
      onJobSuccess: (id, result) => log('Completed ${result.taskName}'),
      onJobFailure: (id, error) => log('Failed: ${error.message}'),
      onRetry: (id, error, attempt, delay) => log('Retry #$attempt in $delay'),
      onPoolIdle: () => log('Pool is idle'),
      onPoolShutdown: () => log('Pool shutting down'),
    )),
);

Global Default Pool

// Use the global pool for convenience
final handle = Loom.defaultPool.submit(task, input);

// Clean up at app shutdown
await Loom.shutdown();

// Or replace with custom pool
await Loom.setDefaultPool(myCustomPool);

Error Handling

Jobs never throw - failures are wrapped in results:

final result = await handle.result;

if (result.isSuccess) {
  print(result.valueOrThrow);
} else {
  final error = result.errorOrNull!;
  print('Category: ${error.category}'); // taskError, timeout, cancelled, etc.
  print('Message: ${error.message}');
}

// Pattern matching
final value = result.fold(
  onSuccess: (v) => 'Got: $v',
  onFailure: (e) => 'Error: $e',
);

License

BSD-3-Clause

Libraries

loom
Loom: A worker pool framework for Dart.