loom 1.0.0 copy "loom: ^1.0.0" to clipboard
loom: ^1.0.0 copied to clipboard

A lightweight, flexible worker pool framework for Dart with priority scheduling, retry policies, and graceful cancellation.

example/loom_example.dart

// ignore_for_file: avoid_print

import 'package:loom/loom.dart';

/// Loom Examples - Comprehensive demonstration of the worker pool framework.
///
/// Run with: dart run example/loom_example.dart
Future<void> main() async {
  print('=== Loom Worker Pool Examples ===\n');

  await basicUsageExample();
  await prioritySchedulingExample();
  await retryPolicyExample();
  await cancellationExample();
  await progressReportingExample();
  await lifecycleHooksExample();
  await errorHandlingExample();
  await customPoolConfigurationExample();
  await globalPoolExample();

  print('\n=== All Examples Complete ===');
}

// ============================================================================
// Example 1: Basic Usage
// ============================================================================

/// Demonstrates the fundamental pattern: define a task, create a pool, submit work.
Future<void> basicUsageExample() async {
  print('--- Example 1: Basic Usage ---');

  // Define a typed task with input String and output int
  final parseTask = Task<String, int>.simple(
    name: 'parseNumber',
    executor: (input, ctx) async {
      return int.parse(input);
    },
  );

  // Create an I/O-optimized pool (good for async operations)
  final pool = WorkerPool.io('basic-example');

  try {
    // Submit work and get a handle for tracking
    final handle = pool.submit(parseTask, '42');

    // Wait for the result (never throws - errors are wrapped)
    final result = await handle.result;

    // Check success and extract value
    if (result.isSuccess) {
      print('  Parsed value: ${result.valueOrThrow}');
      print('  Duration: ${result.duration.inMicroseconds}μs');
      print('  Retry count: ${result.retryCount}');
    }
  } finally {
    await pool.shutdown();
  }
  print('');
}

// ============================================================================
// Example 2: Priority Scheduling
// ============================================================================

/// Shows how jobs are scheduled based on priority.
/// Higher priority jobs are processed before lower priority ones.
Future<void> prioritySchedulingExample() async {
  print('--- Example 2: Priority Scheduling ---');

  final results = <String>[];

  final task = Task<String, String>.simple(
    name: 'priorityTask',
    executor: (input, ctx) async {
      // Small delay to simulate work
      await Future<void>.delayed(const Duration(milliseconds: 10));
      return input;
    },
  );

  // Single worker pool to demonstrate ordering
  final pool = WorkerPool.fromBuilder(
    WorkerPoolBuilder('priority-example').withWorkers(1).withExecutionMode(ExecutionMode.test),
  );

  try {
    // Submit jobs in this order: low, normal, critical, high
    // They should complete: first-submitted, then critical, high, normal, low
    final handles = [
      pool.submit(task, 'low', priority: Priority.low),
      pool.submit(task, 'normal', priority: Priority.normal),
      pool.submit(task, 'critical', priority: Priority.critical),
      pool.submit(task, 'high', priority: Priority.high),
    ];

    // Collect results in completion order
    for (final handle in handles) {
      final result = await handle.result;
      results.add(result.valueOrThrow);
    }

    print('  Completion order: ${results.join(' -> ')}');
    print('  (First job runs immediately, then queue processes by priority)');
  } finally {
    await pool.shutdown();
  }
  print('');
}

// ============================================================================
// Example 3: Retry Policies
// ============================================================================

/// Demonstrates different retry strategies for handling transient failures.
Future<void> retryPolicyExample() async {
  print('--- Example 3: Retry Policies ---');

  var attemptCount = 0;

  // A task that fails twice before succeeding
  final flakyTask = Task<int, String>.simple(
    name: 'flakyOperation',
    executor: (maxFailures, ctx) async {
      attemptCount++;
      if (attemptCount <= maxFailures) {
        throw Exception('Transient failure #$attemptCount');
      }
      return 'Success after $attemptCount attempts';
    },
  );

  final pool = WorkerPool.fromBuilder(
    WorkerPoolBuilder('retry-example').withExecutionMode(ExecutionMode.test),
  );

  try {
    // Fixed delay retry - same delay between each attempt
    attemptCount = 0;
    final handle1 = pool.submit(
      flakyTask,
      2, // Fail twice
      retryPolicy: RetryPolicy.fixed(
        maxAttempts: 5,
        delay: const Duration(milliseconds: 10),
      ),
    );
    final result1 = await handle1.result;
    print('  Fixed retry: ${result1.valueOrThrow}');

    // Exponential backoff - delay doubles each time
    attemptCount = 0;
    final handle2 = pool.submit(
      flakyTask,
      2,
      retryPolicy: RetryPolicy.exponentialBackoff(
        maxAttempts: 5,
        initialDelay: const Duration(milliseconds: 10),
        maxDelay: const Duration(seconds: 1),
        multiplier: 2.0,
      ),
    );
    final result2 = await handle2.result;
    print('  Exponential backoff: ${result2.valueOrThrow}');

    // No retry - fail immediately
    attemptCount = 0;
    final handle3 = pool.submit(
      flakyTask,
      2,
      retryPolicy: const RetryPolicy.none(),
    );
    final result3 = await handle3.result;
    print('  No retry: ${result3.isFailure ? "Failed as expected" : "Unexpected success"}');
  } finally {
    await pool.shutdown();
  }
  print('');
}

// ============================================================================
// Example 4: Cancellation
// ============================================================================

/// Shows how to cancel jobs using cancellation tokens.
Future<void> cancellationExample() async {
  print('--- Example 4: Cancellation ---');

  final task = Task<int, String>.simple(
    name: 'longRunningTask',
    executor: (iterations, ctx) async {
      for (var i = 0; i < iterations; i++) {
        // Always check for cancellation at safe points in your loop
        ctx.throwIfCancelled();

        // Simulate work
        await Future<void>.delayed(const Duration(milliseconds: 5));
        ctx.reportProgress(i / iterations);
      }
      return 'Completed all $iterations iterations';
    },
  );

  // Use single worker pool to demonstrate queue-based cancellation
  final pool = WorkerPool.fromBuilder(
    WorkerPoolBuilder('cancellation-example').withWorkers(1).withExecutionMode(ExecutionMode.main),
  );

  try {
    // Method 1: Pre-cancelled token (job fails immediately)
    final tokenSource1 = CancellationTokenSource();
    tokenSource1.cancel(); // Pre-cancel before submitting

    final handle1 = pool.submit(
      task,
      10,
      cancellationToken: tokenSource1.token,
    );

    final result1 = await handle1.result;
    print('  Pre-cancelled token: cancelled=${result1.cancelled}');

    // Method 2: Cancel queued job before it runs
    // First, submit a blocking job to occupy the single worker
    final blockingHandle = pool.submit(task, 10);

    // Submit another job that will be queued
    final tokenSource2 = CancellationTokenSource();
    final handle2 = pool.submit(
      task,
      10,
      cancellationToken: tokenSource2.token,
    );

    // Cancel the queued job before it starts
    tokenSource2.cancel();

    // Wait for both jobs
    final result2 = await handle2.result;
    await blockingHandle.result;
    print('  Cancelled queued job: cancelled=${result2.cancelled}');

    // Method 3: Token source can be reused pattern
    final tokenSource3 = CancellationTokenSource();
    final blockingHandle2 = pool.submit(task, 10);
    final handle3 = pool.submit(task, 10, cancellationToken: tokenSource3.token);
    tokenSource3.cancel(); // Cancel via the token source

    final result3 = await handle3.result;
    await blockingHandle2.result;
    print('  Another token example: cancelled=${result3.cancelled}');

    // Show error details for cancelled jobs
    if (result3.cancelled) {
      print('  Error category: ${result3.errorOrNull?.category.name}');
    }
  } finally {
    await pool.shutdown();
  }
  print('');
}

// ============================================================================
// Example 5: Progress Reporting
// ============================================================================

/// Demonstrates how tasks can report progress to the caller.
Future<void> progressReportingExample() async {
  print('--- Example 5: Progress Reporting ---');

  final task = Task<int, String>.simple(
    name: 'downloadSimulation',
    executor: (totalSteps, ctx) async {
      for (var i = 0; i <= totalSteps; i++) {
        // Report progress as a fraction (0.0 to 1.0)
        ctx.reportProgress(i / totalSteps);

        // Or report custom progress data
        ctx.reportProgress({'step': i, 'total': totalSteps});

        await Future<void>.delayed(const Duration(milliseconds: 20));
      }
      return 'Downloaded $totalSteps chunks';
    },
  );

  final pool = WorkerPool.fromBuilder(
    WorkerPoolBuilder('progress-example').withExecutionMode(ExecutionMode.test),
  );

  try {
    final handle = pool.submit(task, 5);

    // Listen to progress updates
    final progressValues = <Object>[];
    handle.progress.listen((progress) {
      progressValues.add(progress);
    });

    final result = await handle.result;
    print('  Result: ${result.valueOrThrow}');
    print('  Progress updates received: ${progressValues.length}');
    print('  Sample progress: ${progressValues.take(3).join(", ")}...');
  } finally {
    await pool.shutdown();
  }
  print('');
}

// ============================================================================
// Example 6: Lifecycle Hooks
// ============================================================================

/// Shows how to monitor pool activity using lifecycle hooks.
Future<void> lifecycleHooksExample() async {
  print('--- Example 6: Lifecycle Hooks ---');

  final events = <String>[];

  final pool = WorkerPool.fromBuilder(
    WorkerPoolBuilder('hooks-example')
        .withExecutionMode(ExecutionMode.test)
        .withHooks(
          PoolHooks(
            onJobStart: (jobId, taskName) {
              events.add('START: $taskName');
            },
            onJobSuccess: (jobId, result) {
              events.add('SUCCESS: ${result.taskName} (${result.duration.inMicroseconds}μs)');
            },
            onJobFailure: (jobId, error) {
              events.add('FAILURE: ${error.category.name}');
            },
            onRetry: (jobId, error, attempt, delay) {
              events.add('RETRY: attempt #$attempt after $delay');
            },
            onPoolIdle: () {
              events.add('IDLE: pool is idle');
            },
            onPoolShutdown: () {
              events.add('SHUTDOWN: pool shutting down');
            },
          ),
        ),
  );

  try {
    final successTask = Task<String, int>.simple(
      name: 'parseNumber',
      executor: (input, ctx) async => int.parse(input),
    );

    var attempts = 0;
    final retryTask = Task<void, String>.simple(
      name: 'retryableTask',
      executor: (_, ctx) async {
        attempts++;
        if (attempts < 2) throw Exception('Try again');
        return 'Done';
      },
    );

    // Submit jobs
    await pool.submit(successTask, '42').result;
    await pool
        .submit(
          retryTask,
          null,
          retryPolicy: RetryPolicy.fixed(
            maxAttempts: 3,
            delay: Duration.zero,
          ),
        )
        .result;

    // Wait for idle
    await Future<void>.delayed(const Duration(milliseconds: 50));
  } finally {
    await pool.shutdown();
  }

  print('  Events captured:');
  for (final event in events) {
    print('    - $event');
  }
  print('');
}

// ============================================================================
// Example 7: Error Handling
// ============================================================================

/// Demonstrates structured error handling - no raw exceptions leak through.
Future<void> errorHandlingExample() async {
  print('--- Example 7: Error Handling ---');

  final failingTask = Task<String, int>.simple(
    name: 'failingTask',
    executor: (input, ctx) async {
      if (input == 'invalid') {
        throw const FormatException('Cannot parse "invalid"');
      }
      return int.parse(input);
    },
  );

  final pool = WorkerPool.fromBuilder(
    WorkerPoolBuilder('error-example').withExecutionMode(ExecutionMode.test),
  );

  try {
    final handle = pool.submit(failingTask, 'invalid');
    final result = await handle.result;

    // Results never throw - check isSuccess/isFailure
    print('  Is failure: ${result.isFailure}');

    // Access error details safely
    final error = result.errorOrNull;
    if (error != null) {
      print('  Error category: ${error.category.name}');
      print('  Error message: ${error.message}');
      print('  Original cause: ${error.cause}');
    }

    // Pattern matching with fold
    final message = result.fold(
      onSuccess: (value) => 'Got value: $value',
      onFailure: (error) => 'Error: ${error.category.name}',
    );
    print('  Fold result: $message');

    // Map transforms success, preserves failure
    final mapped = result.map((value) => value * 2);
    print('  Mapped still failure: ${mapped.isFailure}');

    // Different error categories
    print('  Error categories available:');
    for (final category in JobErrorCategory.values) {
      print('    - ${category.name}');
    }
  } finally {
    await pool.shutdown();
  }
  print('');
}

// ============================================================================
// Example 8: Custom Pool Configuration
// ============================================================================

/// Shows all the configuration options available for worker pools.
Future<void> customPoolConfigurationExample() async {
  print('--- Example 8: Custom Pool Configuration ---');

  // Highly customized pool
  final pool = WorkerPool.fromBuilder(
    WorkerPoolBuilder('custom-pool')
        // Number of concurrent workers
        .withWorkers(4)
        // Execution mode: main (async), isolate (parallel), test (sync)
        .withExecutionMode(ExecutionMode.main)
        // Maximum jobs that can wait in queue
        .withMaxQueueSize(100)
        // What to do when queue is full
        .withOverflowStrategy(OverflowStrategy.dropOldest)
        // Default priority for jobs without explicit priority
        .withDefaultPriority(Priority.normal)
        // Default retry policy for all jobs
        .withRetryPolicy(
          RetryPolicy.exponentialBackoff(
            maxAttempts: 3,
            initialDelay: const Duration(milliseconds: 100),
          ),
        ),
  );

  print('  Pool created with:');
  print('    - Name: ${pool.name}');
  print('    - Workers: ${pool.config.workerCount}');
  print('    - Execution mode: ${pool.config.executionMode.name}');
  print('    - Max queue size: ${pool.config.maxQueueSize}');
  print('    - Overflow strategy: ${pool.config.overflowStrategy.name}');

  // Factory presets for common use cases
  print('  Factory presets:');
  print('    - WorkerPool.cpu() - Isolate execution for compute');
  print('    - WorkerPool.io() - Main isolate for async I/O');
  print('    - WorkerPool.ui() - Limited workers for UI updates');

  await pool.shutdown();
  print('');
}

// ============================================================================
// Example 9: Global Default Pool
// ============================================================================

/// Shows usage of the global Loom singleton for convenience.
Future<void> globalPoolExample() async {
  print('--- Example 9: Global Default Pool ---');

  final task = Task<int, int>.simple(
    name: 'double',
    executor: (input, ctx) async => input * 2,
  );

  // Use the global default pool (lazily created)
  final handle = Loom.defaultPool.submit(task, 21);
  final result = await handle.result;
  print('  Result from default pool: ${result.valueOrThrow}');

  // Access pool info
  print('  Default pool name: ${Loom.defaultPool.name}');
  print('  Default pool state: ${Loom.defaultPool.state.name}');

  // Replace with custom pool if needed
  final customPool = WorkerPool.fromBuilder(
    WorkerPoolBuilder('my-global-pool').withWorkers(2).withExecutionMode(ExecutionMode.test),
  );
  await Loom.setDefaultPool(customPool);
  print('  Replaced default pool with: ${Loom.defaultPool.name}');

  // Always clean up at app shutdown
  await Loom.shutdown();
  print('  Global pool shut down');
  print('');
}
1
likes
160
points
0
downloads

Publisher

unverified uploader

Weekly Downloads

A lightweight, flexible worker pool framework for Dart with priority scheduling, retry policies, and graceful cancellation.

Homepage
Repository (GitHub)
View/report issues

Topics

#concurrency #worker-pool #async #isolate #task-queue

Documentation

API reference

License

MIT (license)

More

Packages that depend on loom