loom 1.0.0
loom: ^1.0.0 copied to clipboard
A lightweight, flexible worker pool framework for Dart with priority scheduling, retry policies, and graceful cancellation.
// ignore_for_file: avoid_print
import 'package:loom/loom.dart';
/// Loom Examples - Comprehensive demonstration of the worker pool framework.
///
/// Run with: dart run example/loom_example.dart
Future<void> main() async {
print('=== Loom Worker Pool Examples ===\n');
await basicUsageExample();
await prioritySchedulingExample();
await retryPolicyExample();
await cancellationExample();
await progressReportingExample();
await lifecycleHooksExample();
await errorHandlingExample();
await customPoolConfigurationExample();
await globalPoolExample();
print('\n=== All Examples Complete ===');
}
// ============================================================================
// Example 1: Basic Usage
// ============================================================================
/// Demonstrates the fundamental pattern: define a task, create a pool, submit work.
Future<void> basicUsageExample() async {
print('--- Example 1: Basic Usage ---');
// Define a typed task with input String and output int
final parseTask = Task<String, int>.simple(
name: 'parseNumber',
executor: (input, ctx) async {
return int.parse(input);
},
);
// Create an I/O-optimized pool (good for async operations)
final pool = WorkerPool.io('basic-example');
try {
// Submit work and get a handle for tracking
final handle = pool.submit(parseTask, '42');
// Wait for the result (never throws - errors are wrapped)
final result = await handle.result;
// Check success and extract value
if (result.isSuccess) {
print(' Parsed value: ${result.valueOrThrow}');
print(' Duration: ${result.duration.inMicroseconds}μs');
print(' Retry count: ${result.retryCount}');
}
} finally {
await pool.shutdown();
}
print('');
}
// ============================================================================
// Example 2: Priority Scheduling
// ============================================================================
/// Shows how jobs are scheduled based on priority.
/// Higher priority jobs are processed before lower priority ones.
Future<void> prioritySchedulingExample() async {
print('--- Example 2: Priority Scheduling ---');
final results = <String>[];
final task = Task<String, String>.simple(
name: 'priorityTask',
executor: (input, ctx) async {
// Small delay to simulate work
await Future<void>.delayed(const Duration(milliseconds: 10));
return input;
},
);
// Single worker pool to demonstrate ordering
final pool = WorkerPool.fromBuilder(
WorkerPoolBuilder('priority-example').withWorkers(1).withExecutionMode(ExecutionMode.test),
);
try {
// Submit jobs in this order: low, normal, critical, high
// They should complete: first-submitted, then critical, high, normal, low
final handles = [
pool.submit(task, 'low', priority: Priority.low),
pool.submit(task, 'normal', priority: Priority.normal),
pool.submit(task, 'critical', priority: Priority.critical),
pool.submit(task, 'high', priority: Priority.high),
];
// Collect results in completion order
for (final handle in handles) {
final result = await handle.result;
results.add(result.valueOrThrow);
}
print(' Completion order: ${results.join(' -> ')}');
print(' (First job runs immediately, then queue processes by priority)');
} finally {
await pool.shutdown();
}
print('');
}
// ============================================================================
// Example 3: Retry Policies
// ============================================================================
/// Demonstrates different retry strategies for handling transient failures.
Future<void> retryPolicyExample() async {
print('--- Example 3: Retry Policies ---');
var attemptCount = 0;
// A task that fails twice before succeeding
final flakyTask = Task<int, String>.simple(
name: 'flakyOperation',
executor: (maxFailures, ctx) async {
attemptCount++;
if (attemptCount <= maxFailures) {
throw Exception('Transient failure #$attemptCount');
}
return 'Success after $attemptCount attempts';
},
);
final pool = WorkerPool.fromBuilder(
WorkerPoolBuilder('retry-example').withExecutionMode(ExecutionMode.test),
);
try {
// Fixed delay retry - same delay between each attempt
attemptCount = 0;
final handle1 = pool.submit(
flakyTask,
2, // Fail twice
retryPolicy: RetryPolicy.fixed(
maxAttempts: 5,
delay: const Duration(milliseconds: 10),
),
);
final result1 = await handle1.result;
print(' Fixed retry: ${result1.valueOrThrow}');
// Exponential backoff - delay doubles each time
attemptCount = 0;
final handle2 = pool.submit(
flakyTask,
2,
retryPolicy: RetryPolicy.exponentialBackoff(
maxAttempts: 5,
initialDelay: const Duration(milliseconds: 10),
maxDelay: const Duration(seconds: 1),
multiplier: 2.0,
),
);
final result2 = await handle2.result;
print(' Exponential backoff: ${result2.valueOrThrow}');
// No retry - fail immediately
attemptCount = 0;
final handle3 = pool.submit(
flakyTask,
2,
retryPolicy: const RetryPolicy.none(),
);
final result3 = await handle3.result;
print(' No retry: ${result3.isFailure ? "Failed as expected" : "Unexpected success"}');
} finally {
await pool.shutdown();
}
print('');
}
// ============================================================================
// Example 4: Cancellation
// ============================================================================
/// Shows how to cancel jobs using cancellation tokens.
Future<void> cancellationExample() async {
print('--- Example 4: Cancellation ---');
final task = Task<int, String>.simple(
name: 'longRunningTask',
executor: (iterations, ctx) async {
for (var i = 0; i < iterations; i++) {
// Always check for cancellation at safe points in your loop
ctx.throwIfCancelled();
// Simulate work
await Future<void>.delayed(const Duration(milliseconds: 5));
ctx.reportProgress(i / iterations);
}
return 'Completed all $iterations iterations';
},
);
// Use single worker pool to demonstrate queue-based cancellation
final pool = WorkerPool.fromBuilder(
WorkerPoolBuilder('cancellation-example').withWorkers(1).withExecutionMode(ExecutionMode.main),
);
try {
// Method 1: Pre-cancelled token (job fails immediately)
final tokenSource1 = CancellationTokenSource();
tokenSource1.cancel(); // Pre-cancel before submitting
final handle1 = pool.submit(
task,
10,
cancellationToken: tokenSource1.token,
);
final result1 = await handle1.result;
print(' Pre-cancelled token: cancelled=${result1.cancelled}');
// Method 2: Cancel queued job before it runs
// First, submit a blocking job to occupy the single worker
final blockingHandle = pool.submit(task, 10);
// Submit another job that will be queued
final tokenSource2 = CancellationTokenSource();
final handle2 = pool.submit(
task,
10,
cancellationToken: tokenSource2.token,
);
// Cancel the queued job before it starts
tokenSource2.cancel();
// Wait for both jobs
final result2 = await handle2.result;
await blockingHandle.result;
print(' Cancelled queued job: cancelled=${result2.cancelled}');
// Method 3: Token source can be reused pattern
final tokenSource3 = CancellationTokenSource();
final blockingHandle2 = pool.submit(task, 10);
final handle3 = pool.submit(task, 10, cancellationToken: tokenSource3.token);
tokenSource3.cancel(); // Cancel via the token source
final result3 = await handle3.result;
await blockingHandle2.result;
print(' Another token example: cancelled=${result3.cancelled}');
// Show error details for cancelled jobs
if (result3.cancelled) {
print(' Error category: ${result3.errorOrNull?.category.name}');
}
} finally {
await pool.shutdown();
}
print('');
}
// ============================================================================
// Example 5: Progress Reporting
// ============================================================================
/// Demonstrates how tasks can report progress to the caller.
Future<void> progressReportingExample() async {
print('--- Example 5: Progress Reporting ---');
final task = Task<int, String>.simple(
name: 'downloadSimulation',
executor: (totalSteps, ctx) async {
for (var i = 0; i <= totalSteps; i++) {
// Report progress as a fraction (0.0 to 1.0)
ctx.reportProgress(i / totalSteps);
// Or report custom progress data
ctx.reportProgress({'step': i, 'total': totalSteps});
await Future<void>.delayed(const Duration(milliseconds: 20));
}
return 'Downloaded $totalSteps chunks';
},
);
final pool = WorkerPool.fromBuilder(
WorkerPoolBuilder('progress-example').withExecutionMode(ExecutionMode.test),
);
try {
final handle = pool.submit(task, 5);
// Listen to progress updates
final progressValues = <Object>[];
handle.progress.listen((progress) {
progressValues.add(progress);
});
final result = await handle.result;
print(' Result: ${result.valueOrThrow}');
print(' Progress updates received: ${progressValues.length}');
print(' Sample progress: ${progressValues.take(3).join(", ")}...');
} finally {
await pool.shutdown();
}
print('');
}
// ============================================================================
// Example 6: Lifecycle Hooks
// ============================================================================
/// Shows how to monitor pool activity using lifecycle hooks.
Future<void> lifecycleHooksExample() async {
print('--- Example 6: Lifecycle Hooks ---');
final events = <String>[];
final pool = WorkerPool.fromBuilder(
WorkerPoolBuilder('hooks-example')
.withExecutionMode(ExecutionMode.test)
.withHooks(
PoolHooks(
onJobStart: (jobId, taskName) {
events.add('START: $taskName');
},
onJobSuccess: (jobId, result) {
events.add('SUCCESS: ${result.taskName} (${result.duration.inMicroseconds}μs)');
},
onJobFailure: (jobId, error) {
events.add('FAILURE: ${error.category.name}');
},
onRetry: (jobId, error, attempt, delay) {
events.add('RETRY: attempt #$attempt after $delay');
},
onPoolIdle: () {
events.add('IDLE: pool is idle');
},
onPoolShutdown: () {
events.add('SHUTDOWN: pool shutting down');
},
),
),
);
try {
final successTask = Task<String, int>.simple(
name: 'parseNumber',
executor: (input, ctx) async => int.parse(input),
);
var attempts = 0;
final retryTask = Task<void, String>.simple(
name: 'retryableTask',
executor: (_, ctx) async {
attempts++;
if (attempts < 2) throw Exception('Try again');
return 'Done';
},
);
// Submit jobs
await pool.submit(successTask, '42').result;
await pool
.submit(
retryTask,
null,
retryPolicy: RetryPolicy.fixed(
maxAttempts: 3,
delay: Duration.zero,
),
)
.result;
// Wait for idle
await Future<void>.delayed(const Duration(milliseconds: 50));
} finally {
await pool.shutdown();
}
print(' Events captured:');
for (final event in events) {
print(' - $event');
}
print('');
}
// ============================================================================
// Example 7: Error Handling
// ============================================================================
/// Demonstrates structured error handling - no raw exceptions leak through.
Future<void> errorHandlingExample() async {
print('--- Example 7: Error Handling ---');
final failingTask = Task<String, int>.simple(
name: 'failingTask',
executor: (input, ctx) async {
if (input == 'invalid') {
throw const FormatException('Cannot parse "invalid"');
}
return int.parse(input);
},
);
final pool = WorkerPool.fromBuilder(
WorkerPoolBuilder('error-example').withExecutionMode(ExecutionMode.test),
);
try {
final handle = pool.submit(failingTask, 'invalid');
final result = await handle.result;
// Results never throw - check isSuccess/isFailure
print(' Is failure: ${result.isFailure}');
// Access error details safely
final error = result.errorOrNull;
if (error != null) {
print(' Error category: ${error.category.name}');
print(' Error message: ${error.message}');
print(' Original cause: ${error.cause}');
}
// Pattern matching with fold
final message = result.fold(
onSuccess: (value) => 'Got value: $value',
onFailure: (error) => 'Error: ${error.category.name}',
);
print(' Fold result: $message');
// Map transforms success, preserves failure
final mapped = result.map((value) => value * 2);
print(' Mapped still failure: ${mapped.isFailure}');
// Different error categories
print(' Error categories available:');
for (final category in JobErrorCategory.values) {
print(' - ${category.name}');
}
} finally {
await pool.shutdown();
}
print('');
}
// ============================================================================
// Example 8: Custom Pool Configuration
// ============================================================================
/// Shows all the configuration options available for worker pools.
Future<void> customPoolConfigurationExample() async {
print('--- Example 8: Custom Pool Configuration ---');
// Highly customized pool
final pool = WorkerPool.fromBuilder(
WorkerPoolBuilder('custom-pool')
// Number of concurrent workers
.withWorkers(4)
// Execution mode: main (async), isolate (parallel), test (sync)
.withExecutionMode(ExecutionMode.main)
// Maximum jobs that can wait in queue
.withMaxQueueSize(100)
// What to do when queue is full
.withOverflowStrategy(OverflowStrategy.dropOldest)
// Default priority for jobs without explicit priority
.withDefaultPriority(Priority.normal)
// Default retry policy for all jobs
.withRetryPolicy(
RetryPolicy.exponentialBackoff(
maxAttempts: 3,
initialDelay: const Duration(milliseconds: 100),
),
),
);
print(' Pool created with:');
print(' - Name: ${pool.name}');
print(' - Workers: ${pool.config.workerCount}');
print(' - Execution mode: ${pool.config.executionMode.name}');
print(' - Max queue size: ${pool.config.maxQueueSize}');
print(' - Overflow strategy: ${pool.config.overflowStrategy.name}');
// Factory presets for common use cases
print(' Factory presets:');
print(' - WorkerPool.cpu() - Isolate execution for compute');
print(' - WorkerPool.io() - Main isolate for async I/O');
print(' - WorkerPool.ui() - Limited workers for UI updates');
await pool.shutdown();
print('');
}
// ============================================================================
// Example 9: Global Default Pool
// ============================================================================
/// Shows usage of the global Loom singleton for convenience.
Future<void> globalPoolExample() async {
print('--- Example 9: Global Default Pool ---');
final task = Task<int, int>.simple(
name: 'double',
executor: (input, ctx) async => input * 2,
);
// Use the global default pool (lazily created)
final handle = Loom.defaultPool.submit(task, 21);
final result = await handle.result;
print(' Result from default pool: ${result.valueOrThrow}');
// Access pool info
print(' Default pool name: ${Loom.defaultPool.name}');
print(' Default pool state: ${Loom.defaultPool.state.name}');
// Replace with custom pool if needed
final customPool = WorkerPool.fromBuilder(
WorkerPoolBuilder('my-global-pool').withWorkers(2).withExecutionMode(ExecutionMode.test),
);
await Loom.setDefaultPool(customPool);
print(' Replaced default pool with: ${Loom.defaultPool.name}');
// Always clean up at app shutdown
await Loom.shutdown();
print(' Global pool shut down');
print('');
}