async_task 1.0.14 async_task: ^1.0.14 copied to clipboard
Asynchronous tasks and parallel executors (similar to thread pools) for all Dart platforms (transparently), without having to deal with `Isolate` complexity.
async_task #
This package brings asynchronous tasks and parallel executors (similar to classic thread pools) for all Dart platforms (JS/Web, Flutter, VM/Native) through transparent internal implementations, based on dart:isolate or only dart:async, without having to deal with the Isolate complexity.
Motivation #
Dart parallelism is based on asynchronous, non-blocking and thread-safe code. This creates a language that facilitates the creation of safe concurrency code, but all running in the same thread, using only 1 thread/core of a device.
If you want to use more than 1 thread/core of a device, Dart VM/Native has Isolate, but the paradigm is not easy to use like classical Thread Pools, or like environments that shares all the memory/objects between threads.
This package was created to facilitate the creation and execution of multiple tasks asynchronously in all Dart Platforms, avoiding platform specific codes by the developer.
Usage #
import 'dart:async';
import 'package:async_task/async_task.dart';
void main() async {
// The tasks to execute:
var tasks = [
PrimeChecker(8779),
PrimeChecker(1046527),
PrimeChecker(3139581), // Not Prime
PrimeChecker(16769023),
];
// Instantiate the task executor:
var asyncExecutor = AsyncExecutor(
sequential: false, // Non-sequential tasks.
parallelism: 2, // Concurrency with 2 threads.
taskTypeRegister: _taskTypeRegister, // The top-level function to register the tasks types.
);
// Enable logging output:
asyncExecutor.logger.enabled = true ;
// Execute all tasks:
var executions = asyncExecutor.executeAll(tasks);
// Wait tasks executions:
await Future.wait(executions);
for (var task in tasks) {
var n = task.n; // Number to check for prime.
var prime = task.result; // Task result: true if is prime.
print('$n\t-> $prime \t $task');
}
}
// This top-level function returns the tasks types that will be registered
// for execution. Task instances are returned, but won't be executed and
// will be used only to identify the task type:
List<AsyncTask> _taskTypeRegister() => [PrimeChecker(0)];
// A task that checks if a number is prime:
class PrimeChecker extends AsyncTask<int, bool> {
// The number to check if is prime.
final int n;
PrimeChecker(this.n);
// Instantiates a `PrimeChecker` task with `parameters`.
@override
AsyncTask<int, bool> instantiate(int parameters) {
return PrimeChecker(parameters);
}
// The parameters of this task:
@override
int parameters() {
return n;
}
// Runs the task code:
@override
FutureOr<bool> run() {
return isPrime(n);
}
// A simple prime check function:
bool isPrime(int n) {
if (n < 2) return false;
var limit = n ~/ 2;
for (var p = 2; p < limit; ++p) {
if (n % p == 0) return false;
}
return true;
}
}
Output:
[INFO] Starting AsyncExecutor{ sequential: false, parallelism: 2, executorThread: _AsyncExecutorMultiThread{ totalThreads: 2, queue: 0 } }
[INFO] Starting _AsyncExecutorMultiThread{ totalThreads: 2, queue: 0 }
[INFO] Created _IsolateThread{ id: 2 ; registeredTasksTypes: [PrimeChecker] }
[INFO] Created _IsolateThread{ id: 1 ; registeredTasksTypes: [PrimeChecker] }
8779 -> true PrimeChecker(8779)[finished]{ result: true ; executionTime: 2 ms }
1046527 -> true PrimeChecker(1046527)[finished]{ result: true ; executionTime: 2 ms }
3139581 -> false PrimeChecker(3139581)[finished]{ result: false ; executionTime: 0 ms }
16769023 -> true PrimeChecker(16769023)[finished]{ result: true ; executionTime: 35 ms }
SharedData #
The class SharedData
facilitates and optimizes data shared between tasks.
The main advantage of data encapsulated with SharedData
is to avoid multiple
messages, with the same data, between threads/isolates, avoiding concurrency
performance issues and multiple duplicated objects in memory (a GC bottleneck).
Here's an example of a task using SharedData
:
// A task that checks if a number is prime:
class PrimeChecker extends AsyncTask<int, bool> {
// The number to check if is prime.
final int n;
// A list of known primes, shared between tasks.
final SharedData<List<int>, List<int>> knownPrimes;
PrimeChecker(this.n, this.knownPrimes);
// Instantiates a `PrimeChecker` task with `parameters` and `sharedData`.
@override
PrimeChecker instantiate(int parameters, [Map<String, SharedData>? sharedData]) {
return PrimeChecker(
parameters,
sharedData!['knownPrimes'] as SharedData<List<int>, List<int>>,
);
}
// The `SharedData` of this task.
@override
Map<String, SharedData> sharedData() => {'knownPrimes': knownPrimes};
// Loads the `SharedData` from `serial` for each key.
@override
SharedData<List<int>, List<int>> loadSharedData(String key, dynamic serial) {
switch (key) {
case 'knownPrimes':
return SharedData<List<int>, List<int>>(serial);
default:
throw StateError('Unknown key: $key');
}
}
// The parameters of this task:
@override
int parameters() {
return n;
}
// Runs the task code:
@override
FutureOr<bool> run() {
return isPrime(n);
}
// A simple prime check function:
bool isPrime(int n) {
if (n < 2) return false;
// The pre-computed primes, optimizing this checking algorithm:
if (knownPrimes.data.contains(n)) {
return true;
}
// It's sufficient to search for prime factors in the range [1,sqrt(N)]:
var limit = (sqrt(n) + 1).toInt();
for (var p = 2; p < limit; ++p) {
if (n % p == 0) return false;
}
return true;
}
}
The field knownPrimes
above will be shared between tasks. In platforms with support
for dart:isolate knownPrimes
will be sent through an Isolate port only once,
avoiding multiple copies and unnecessary memory allocations.
AsyncTaskChannel #
An AsyncTask
can have a communication channel that can be used to send/received messages
during task execution.
To use a task channel just override channelInstantiator
, than use channelResolved()
inside
the task and await channel()
outside it:
class YourTask extends AsyncTask<String, int> {
// ...
@override
AsyncTaskChannel? channelInstantiator() => AsyncTaskChannel(); // You can extend `AsyncTaskChannel` class if needed.
// ...
@override
FutureOr<int> run() async {
// ...
// get the resolved channel:
var channel = channelResolved()!; // The channel is always resolved inside `run()`.
// Send and wait a message in the channel:
channel.send('some message');
var result = await channel.waitMessage<String>();
// Send and wait in a single method:
var result = await channel.sendAndWaitResponse<String, String>('some message');
// Read a message if available in the queue (non-blocking):
var msgOptional = channel.readMessage<String>();
// ...
}
}
Outside communication with the task:
void main() async {
// ...
// Execute the task:
asyncExecutor.execute(task);
// ...
// Wait for the channel to be resolved:
var channel = (await task.channel())!;
// Wait for a message:
var msg = await channel.waitMessage();
// process msg...
// Send a response:
channel.send('Some response');
// ...
}
An AsyncTaskChannel
is automatically closed when a task finishes (returns its result).
Source #
The official source code is hosted @ GitHub:
Features and bugs #
Please file feature requests and bugs at the issue tracker.
Contribution #
Any help from the open-source community is always welcome and needed:
- Found an issue?
- Please fill a bug report with details.
- Wish a feature?
- Open a feature request with use cases.
- Are you using and liking the project?
- Promote the project: create an article, do a post or make a donation.
- Are you a developer?
- Fix a bug and send a pull request.
- Implement a new feature, like other training algorithms and activation functions.
- Improve the Unit Tests.
- Have you already helped in any way?
- Many thanks from me, the contributors and everybody that uses this project!
If you donate 1 hour of your time, you can contribute a lot, because others will do the same, just be part and start with your 1 hour.
Author #
Graciliano M. Passos: gmpassos@GitHub.