async_task 1.1.1  async_task: ^1.1.1 copied to clipboard
async_task: ^1.1.1 copied to clipboard
Asynchronous tasks and parallel executors (similar to thread pools) for all Dart platforms (transparently), without having to deal with `Isolate` complexity.
async_task #
This package brings asynchronous tasks and parallel executors (similar to classic thread pools) for all Dart platforms (JS/Web, Flutter, VM/Native) through transparent internal implementations, based on dart:isolate or only dart:async, without having to deal with the Isolate complexity.
Motivation #
Dart parallelism is based on asynchronous, non-blocking and thread-safe code. This creates a language that facilitates the creation of safe concurrency code, but all running in the same thread, using only 1 thread/core of a device.
If you want to use more than 1 thread/core of a device, Dart VM/Native has Isolate, but the paradigm is not easy to use like classical Thread Pools, or like environments that shares all the memory/objects between threads.
This package was created to facilitate the creation and execution of multiple tasks asynchronously in all Dart Platforms, avoiding platform specific codes by the developer.
Usage #
import 'dart:async';
import 'package:async_task/async_task.dart';
void main() async {
  // The tasks to execute:
  var tasks = [
    PrimeChecker(8779),
    PrimeChecker(1046527),
    PrimeChecker(3139581), // Not Prime
    PrimeChecker(16769023),
  ];
  // Instantiate the task executor:
  var asyncExecutor = AsyncExecutor(
    sequential: false, // Non-sequential tasks.
    parallelism: 2, // Concurrency with 2 threads.
    taskTypeRegister: _taskTypeRegister, // The top-level function to register the tasks types.
  );
  // Enable logging output:
  asyncExecutor.logger.enabled = true ;
  // Execute all tasks:
  var executions = asyncExecutor.executeAll(tasks);
  // Wait tasks executions:
  await Future.wait(executions);
  for (var task in tasks) {
    var n = task.n; // Number to check for prime.
    var prime = task.result; // Task result: true if is prime.
    print('$n\t-> $prime \t $task');
  }
}
// This top-level function returns the tasks types that will be registered
// for execution. Task instances are returned, but won't be executed and
// will be used only to identify the task type:
List<AsyncTask> _taskTypeRegister() => [PrimeChecker(0)];
// A task that checks if a number is prime:
class PrimeChecker extends AsyncTask<int, bool> {
  // The number to check if is prime.
  final int n;
  PrimeChecker(this.n);
  // Instantiates a `PrimeChecker` task with `parameters`.
  @override
  AsyncTask<int, bool> instantiate(int parameters) {
    return PrimeChecker(parameters);
  }
  // The parameters of this task:
  @override
  int parameters() {
    return n;
  }
  // Runs the task code:
  @override
  FutureOr<bool> run() {
    return isPrime(n);
  }
  // A simple prime check function:
  bool isPrime(int n) {
    if (n < 2) return false;
    var limit = n ~/ 2;
    for (var p = 2; p <= limit; ++p) {
      if (n % p == 0) return false;
    }
    return true;
  }
}
Output:
[INFO] Starting AsyncExecutor{ sequential: false, parallelism: 2, executorThread: _AsyncExecutorMultiThread{ totalThreads: 2, queue: 0 } }
[INFO] Starting _AsyncExecutorMultiThread{ totalThreads: 2, queue: 0 }
[INFO] Created _IsolateThread{ id: 2 ; registeredTasksTypes: [PrimeChecker] }
[INFO] Created _IsolateThread{ id: 1 ; registeredTasksTypes: [PrimeChecker] }
8779     -> true 	 PrimeChecker(8779)[finished]{ result: true ; executionTime: 2 ms }
1046527  -> true 	 PrimeChecker(1046527)[finished]{ result: true ; executionTime: 2 ms }
3139581  -> false 	 PrimeChecker(3139581)[finished]{ result: false ; executionTime: 0 ms }
16769023 -> true 	 PrimeChecker(16769023)[finished]{ result: true ; executionTime: 35 ms }
parallelism #
Note that the parameter parallelism at AsyncExecutor shouldn't be greater than the CPU cores
(unless you really have a case that justifies a higher number).
The most common error is to put a high number for the parameter parallelism thinking that this will increase tasks
execution, but this actually  can reduce your application performance. Note that each allocated Isolateneeds a CPU
core to execute, and a high number of Isolate instances running tasks will be a bottleneck in your system
(specially if you are demanding more Isolate than CPU cores).
SharedData #
The class SharedData facilitates and optimizes data shared between tasks.
The main advantage of data encapsulated with SharedData is to avoid multiple
messages, with the same data, between threads/isolates, avoiding concurrency
performance issues and multiple duplicated objects in memory (a GC bottleneck).
Here's an example of a task using SharedData:
// A task that checks if a number is prime:
class PrimeChecker extends AsyncTask<int, bool> {
  // The number to check if is prime.
  final int n;
  // A list of known primes, shared between tasks.
  final SharedData<List<int>, List<int>> knownPrimes;
  PrimeChecker(this.n, this.knownPrimes);
  // Instantiates a `PrimeChecker` task with `parameters` and `sharedData`.
  @override
  PrimeChecker instantiate(int parameters, [Map<String, SharedData>? sharedData]) {
    return PrimeChecker(
      parameters,
      sharedData!['knownPrimes'] as SharedData<List<int>, List<int>>,
    );
  }
  // The `SharedData` of this task.
  @override
  Map<String, SharedData> sharedData() => {'knownPrimes': knownPrimes};
  // Loads the `SharedData` from `serial` for each key.
  @override
  SharedData<List<int>, List<int>> loadSharedData(String key, dynamic serial) {
    switch (key) {
      case 'knownPrimes':
        return SharedData<List<int>, List<int>>(serial);
      default:
        throw StateError('Unknown key: $key');
    }
  }
  // The parameters of this task:
  @override
  int parameters() {
    return n;
  }
  // Runs the task code:
  @override
  FutureOr<bool> run() {
    return isPrime(n);
  }
  // A simple prime check function:
  bool isPrime(int n) {
    if (n < 2) return false;
    // The pre-computed primes, optimizing this checking algorithm:
    if (knownPrimes.data.contains(n)) {
      return true;
    }
    
    // It's sufficient to search for prime factors in the range [1,sqrt(N)]:
    var limit = (sqrt(n) + 1).toInt();
    for (var p = 2; p < limit; ++p) {
      if (n % p == 0) return false;
    }
    return true;
  }
}
The field knownPrimes above will be shared between tasks. In platforms with support
for dart:isolate knownPrimes will be sent through an Isolate port only once,
avoiding multiple copies and unnecessary memory allocations.
AsyncTaskChannel #
An AsyncTask can have a communication channel that can be used to send/received messages
during task execution.
To use a task channel just override channelInstantiator, than use channelResolved() inside
the task and await channel() outside it:
class YourTask extends AsyncTask<String, int> {
  // ...
  
  @override
  AsyncTaskChannel? channelInstantiator() => AsyncTaskChannel(); // You can extend `AsyncTaskChannel` class if needed.
  // ...
  @override
  FutureOr<int> run() async {
    // ...
    
    // get the resolved channel:
    var channel = channelResolved()!; // The channel is always resolved inside `run()`.
    
    // Send and wait a message in the channel:
    channel.send('some message');
    var result = await channel.waitMessage<String>();
    
    // Send and wait in a single method:
    var result = await channel.sendAndWaitResponse<String, String>('some message');
    // Read a message if available in the queue (non-blocking):
    var msgOptional = channel.readMessage<String>();
    
    // ...
  }
}
Outside communication with the task:
void main() async {
  // ...
  
  // Execute the task:
  asyncExecutor.execute(task);
  
  // ...
  
  // Wait for the channel to be resolved:
  var channel = (await task.channel())!;
  
  // Wait for a message:
  var msg = await channel.waitMessage();
  
  // process msg...
  
  // Send a response:
  channel.send('Some response');
  
  // ...
}
An AsyncTaskChannel is automatically closed when a task finishes (returns its result).
Source #
The official source code is hosted @ GitHub:
Features and bugs #
Please file feature requests and bugs at the issue tracker.
Package shared_map #
See also the package shared_map for a way to transparently share data/objects between Isolates.
Contribution #
Any help from the open-source community is always welcome and needed:
- Have an issue? Please fill a bug report 👍.
- Feature? Request with use cases 🤝.
- Like the project? Promote, post, or donate 😄.
- Are you a developer? Fix a bug, add a feature, or improve tests 🚀.
- Already helped? Many thanks from me, the contributors and all project users 👏👏👏!
Contribute an hour and inspire others to do the same.
Author #
Graciliano M. Passos: gmpassos@GitHub.
Sponsor #
Don't be shy, show some love, and become our GitHub Sponsor. Your support means the world to us, and it keeps the code caffeinated! ☕✨
Thanks a million! 🚀😄