Loom
A structured worker pool framework for Dart providing concurrent task execution with priorities, retries, cancellation, and lifecycle management.
Features
- Priority-based scheduling - Jobs are executed by priority, then by age
- Configurable retry policies - Fixed, linear, or exponential backoff
- Cancellation tokens - Cancel jobs before or during execution
- Lifecycle hooks - Monitor job start, success, failure, and retries
- Metrics collection - Track throughput, failure rates, and durations
- Multiple execution modes - Main isolate, background isolates, or test mode
- Type-safe API - Strongly typed tasks, inputs, and outputs
Quick Start
import 'package:loom/loom.dart';
// Define a task with explicit input/output types
final parseTask = Task<String, int>.simple(
name: 'parseNumber',
executor: (input, ctx) async => int.parse(input),
);
// Create a worker pool
final pool = WorkerPool.io('myPool');
// Submit work and get a handle
final handle = pool.submit(parseTask, '42');
// Wait for result (never throws - failures are wrapped)
final result = await handle.result;
print(result.valueOrThrow); // 42
// Clean up
await pool.shutdown();
Creating Pools
// I/O-optimized pool (for network, file operations)
final ioPool = WorkerPool.io('network');
// CPU-optimized pool (for compute, uses isolates)
final cpuPool = WorkerPool.cpu('compute');
// UI pool (limited workers, avoids congestion)
final uiPool = WorkerPool.ui('ui');
// Custom configuration
final pool = WorkerPool.fromBuilder(
WorkerPoolBuilder('custom')
.withWorkers(8)
.withExecutionMode(ExecutionMode.isolate)
.withMaxQueueSize(500)
.withOverflowStrategy(OverflowStrategy.dropOldest)
.withRetryPolicy(RetryPolicy.exponentialBackoff(
maxAttempts: 3,
initialDelay: Duration(seconds: 1),
)),
);
Task Definition
Tasks are explicit objects with named input/output types:
// Simple task (works on main isolate)
final task = Task<String, int>.simple(
name: 'parse',
executor: (input, ctx) async => int.parse(input),
);
// Main-only task (cannot run on isolates)
final uiTask = Task<void, Widget>.mainOnly(
name: 'buildWidget',
executor: (_, ctx) async => Container(),
);
// Isolate-compatible task
final cpuTask = Task<List<int>, int>.simple(
name: 'sum',
executor: (numbers, ctx) async => numbers.reduce((a, b) => a + b),
isolateCompatible: true,
);
Progress Reporting
final task = Task<int, void>.simple(
name: 'download',
executor: (count, ctx) async {
for (var i = 0; i < count; i++) {
// Report progress
ctx.reportProgress(i / count);
// Check cancellation
ctx.throwIfCancelled();
await fetchPage(i);
}
},
);
final handle = pool.submit(task, 100);
handle.progress.listen((p) => print('Progress: ${p * 100}%'));
Retry Policies
// No retries (default)
RetryPolicy.none()
// Fixed delay between retries
RetryPolicy.fixed(maxAttempts: 3, delay: Duration(seconds: 1))
// Exponential backoff
RetryPolicy.exponentialBackoff(
maxAttempts: 5,
initialDelay: Duration(milliseconds: 100),
maxDelay: Duration(seconds: 30),
multiplier: 2.0,
)
// Override per-job
pool.submit(task, input, retryPolicy: RetryPolicy.fixed(
maxAttempts: 3,
delay: Duration(seconds: 5),
));
Cancellation
// Create a cancellation token
final tokenSource = CancellationTokenSource();
// Submit with token
final handle = pool.submit(task, input, cancellationToken: tokenSource.token);
// Cancel from outside
tokenSource.cancel();
// Or use the handle
handle.cancel();
// Inside task, check cancellation
ctx.throwIfCancelled();
Lifecycle Hooks
final pool = WorkerPool.fromBuilder(
WorkerPoolBuilder('monitored')
.withHooks(PoolHooks(
onJobStart: (id, name) => log('Starting $name'),
onJobSuccess: (id, result) => log('Completed ${result.taskName}'),
onJobFailure: (id, error) => log('Failed: ${error.message}'),
onRetry: (id, error, attempt, delay) => log('Retry #$attempt in $delay'),
onPoolIdle: () => log('Pool is idle'),
onPoolShutdown: () => log('Pool shutting down'),
)),
);
Global Default Pool
// Use the global pool for convenience
final handle = Loom.defaultPool.submit(task, input);
// Clean up at app shutdown
await Loom.shutdown();
// Or replace with custom pool
await Loom.setDefaultPool(myCustomPool);
Error Handling
Jobs never throw - failures are wrapped in results:
final result = await handle.result;
if (result.isSuccess) {
print(result.valueOrThrow);
} else {
final error = result.errorOrNull!;
print('Category: ${error.category}'); // taskError, timeout, cancelled, etc.
print('Message: ${error.message}');
}
// Pattern matching
final value = result.fold(
onSuccess: (v) => 'Got: $v',
onFailure: (e) => 'Error: $e',
);
License
BSD-3-Clause
Libraries
- loom
- Loom: A worker pool framework for Dart.