Worker Pool
A Dart/Flutter package for managing a pool of isolates to execute tasks concurrently, with support for resource management, constant sharing, and proper cleanup.
Features
- Isolate Pool Management: Efficiently manages a pool of Dart isolates for concurrent task execution
- Resource Management: Initialize and clean up resources in isolates with custom finalizers
- Constant Sharing: Share constants between the main thread and isolates
- Task Queue: Automatically queues tasks when all isolates are busy
- Timeout Handling: Configurable timeouts for task execution
- Health Monitoring: Automatic worker restart based on task count or inactivity
- Function Registration: Register functions that can be executed in isolates
- Statistics: Get pool statistics for monitoring performance
Getting Started
Prerequisites
- Dart SDK ^3.8.1
- Flutter >=1.17.0 (for Flutter projects)
Installation
Add worker_pool to your pubspec.yaml:
dependencies:
  worker_pool:
    path: /path/to/worker_pool
Or if published to pub.dev:
dependencies:
  worker_pool: last_version
Usage
Basic Setup
import 'package:worker_pool/worker_pool.dart';
// Initialize the worker pool
// By default, `poolSize` is the number of processors on the machine.
final config = WorkerPoolConfig(
  isolateTimeout: Duration(seconds: 30), // Task timeout
);
await WorkerPool.initialize(config);
// Get the instance
final pool = WorkerPool.instance;
Function Registration
To execute functions in isolates, they need to be registered during initialization:
// Define a function that can be executed in an isolate
Future<int> computeSquare(int number) async {
  // Simulate some computation
  await Future.delayed(Duration(milliseconds: 100));
  return number * number;
}
// Register the function during initialization
final config = WorkerPoolConfig(
  poolSize: 4,
  predefinedFunctions: {
    'computeSquare': computeSquare,
  },
);
await WorkerPool.initialize(config);
Task Execution
// Submit a task to the pool
try {
  final result = await pool.submit<int, int>('computeSquare', 5);
  print('Result: $result'); // Output: Result: 25
} catch (e) {
  print('Task failed: $e');
}
Using Constants
Share constants between the main thread and isolates:
// Define a constant provider
Future<Map<String, dynamic>> provideConstants() async {
  return {
    'apiUrl': 'https://api.example.com',
    'timeout': 5000,
  };
}
// Register the constant provider
final config = WorkerPoolConfig(
  poolSize: 4,
  constantProviders: [provideConstants],
  predefinedFunctions: {
    'computeWithConstants': computeWithConstants,
  },
);
await WorkerPool.initialize(config);
In isolate functions:
Future<String> computeWithConstants(String input) async {
  final apiUrl = IsolateConstantManager.getConstant<String>('apiUrl');
  final timeout = IsolateConstantManager.getConstant<int>('timeout');
  
  // Use constants in computation
  return '$input processed with API: $apiUrl and timeout: $timeout';
}
Resource Management
Initialize and clean up resources in isolates:
// Define a resource initializer
Future<Map<String, dynamic>?> initializeDatabase() async {
  // Initialize database connection
  final db = await Database.connect('database_url');
  return {
    'database': db,
  };
}
// Define a resource finalizer
Future<void> finalizeDatabase(dynamic resource) async {
  if (resource is Database) {
    await resource.close();
  }
}
// Register resource management
final config = WorkerPoolConfig(
  poolSize: 4,
  resourceInitializers: [initializeDatabase],
  resourceFinalizers: {
    'database': finalizeDatabase,
  },
);
await WorkerPool.initialize(config);
Using registered resources in isolate tasks:
// Define a function to execute in an isolate using registered resources
Future<String> processWithDatabase(String data) async {
  // Use IsolateResourceManager to get registered resources
  final database = IsolateResourceManager.getResource<Database>('database');
  
  if (database == null) {
    throw Exception('Database resource not found');
  }
  
  // Use the database resource for operations
  final result = await database.query('SELECT * FROM table WHERE data = ?', [data]);
  return result.toString();
}
// Register this function and initialize the worker pool
final config = WorkerPoolConfig(
  poolSize: 4,
  resourceInitializers: [initializeDatabase],
  resourceFinalizers: {
    'database': finalizeDatabase,
  },
  predefinedFunctions: {
    'processWithDatabase': processWithDatabase,
  },
);
await WorkerPool.initialize(config);
// Submit a task that uses resources
final result = await pool.submit<String, String>('processWithDatabase', 'sample_data');
print('Processing result: $result');
Getting Statistics
Monitor pool performance:
final stats = pool.getStatistics();
print('Total workers: ${stats['totalWorkers']}');
print('Available workers: ${stats['availableWorkers']}');
print('Busy workers: ${stats['busyWorkers']}');
print('Queued tasks: ${stats['queuedTasks']}');
Cleanup
Dispose the pool when finished:
await pool.dispose();
Configuration Options
The WorkerPoolConfig class provides several configuration options:
| Option | Description | Default | 
|---|---|---|
| poolSize | Number of isolates to maintain in the pool. | Platform.numberOfProcessors | 
| resourceInitializers | List of functions to initialize resources in isolates | [] | 
| resourceFinalizers | Map of finalizer functions for resource cleanup | {} | 
| constantProviders | List of functions to provide constants to isolates | [] | 
| healthCheckInterval | Interval for health checks | 5 minutes | 
| isolateTimeout | Timeout for task execution | 30 seconds | 
| maxTasksPerIsolate | Maximum tasks per isolate before restart | 1000 | 
| predefinedFunctions | Map of functions that can be executed in isolates | {} | 
Additional Information
Error Handling
The worker pool handles errors gracefully:
- Task execution errors are propagated to the caller
- Isolate crashes are automatically recovered by restarting the worker
- Timeouts are handled with TimeoutException
Performance Considerations
- Tasks should be CPU-intensive to benefit from isolate-based concurrency
- I/O operations should use async/await and may not benefit from isolates
- Consider the overhead of data serialization when passing arguments to isolates
Testing
The project includes comprehensive unit tests to ensure functionality and reliability:
flutter test
Contributing
Contributions are welcome! Please feel free to submit issues and pull requests.
License
This project is licensed under the MIT License - see the LICENSE file for details.
Libraries
- log
- Provides a simple logging utility for the worker_pool package.
- worker_pool
- A robust and easy-to-use isolate pool for Dart and Flutter.