multitasking 2.9.0 copy "multitasking: ^2.9.0" to clipboard
multitasking: ^2.9.0 copied to clipboard

Cooperative multitasking using asynchronous tasks and synchronization primitives, with the ability to safely cancel groups of nested tasks performing I/O wait or listen operations.

Multitasking #

Cooperative multitasking using asynchronous tasks and synchronization primitives, with the ability to safely cancel groups of nested tasks performing I/O wait or listen operations.

Version: 2.9.0

Pub Package Pub Monthly Downloads GitHub Issues GitHub Forks GitHub Stars GitHub License

Producer/Consumer Problem: Demonstration of a monitor and two condition variables operation.
Producer/consumer problem: demonstration of a monitor and two condition variables operation.

About this software #

Cooperative multitasking using asynchronous tasks and synchronization primitives, with the ability to safely cancel groups of nested tasks performing I/O wait or listen operations.
The tasks is implemented using the following standard core classes:

Are the tasks safe and reliable?
Yes, because the tasks have a very simple construction and operating mechanism.
In a few words, the task life cycle can be described as follows:

  • A task is created with an action in the form of a function that must be executed
  • The initial state of a task is the state in which the action has not yet started to execute
  • After receiving a command to start executing an action, the task waits for the action (function) to complete its execution
  • After completing this function, the task (using Completer<T>) puts itself into one of the states indicating the completion of the task
  • After this, the task result (or error) becomes available through a variable future with the value type Future

To simplify working with the task, it itself is an instance of an object that implements the Future interface.
In this case, the task does not replace Future<T> (doesn't reinvent the wheel), it uses the standard Completer<T> and its field future.

Thus, a Task<T> is an object that implements the Future<T> interface by using Completer<T>.
This task only adds the ability (to Future<T>) to start its execution on command and track the completion state of the action.

Very simplified Task code:

class Task<T> implements Future<T> {
  final FutureOr<T> Function() _action;

  final Completer<Result<T>> _taskCompleter = Completer();

  Future<T>? _future;

  Task(this._action);

  Future<T> get future async {
    if (_future != null) {
      return _future as Future<T>;
    }

    if (_state == TaskState.created) {
      throw StateError('Task has not yet been started: ${toString()}');
    }

    final result = await _taskCompleter.future;
    if (result.isValue) {
      final valueResult = result.asValue!;
      final value = valueResult.value;
      final future = Future.value(value);
      _future = future;
      return future;
    } else {
      final errorResult = result.asError!;
      _finalizer.detach(this);
      final error = errorResult.error;
      final stackTrace = errorResult.stackTrace;
      final future = Future<T>.error(error, stackTrace);
      _future = future;
      return future;
    }
  }

  Future<void> start() async {
    unawaited(runZoned(() async {
      try {
        final value = await _action();
        _complete(TaskState.completed, ValueResult(value));
      } catch (e, s) {
        _complete(TaskState.failed, ErrorResult(e, s));
      }
    }));
  }

  @override
  Future<R> then<R>(FutureOr<R> Function(T value) onValue,
      {Function? onError}) {
    return future.then(onValue, onError: onError);
  }

  void _complete(TaskState state, Result<T> result) {
    // Invoke destructor
    // Set task state and and complete `_taskCompleter`
  }
}

The main purpose of tasks is to conveniently manage a large number of asynchronous tasks with nested subtasks running simultaneously and cooperatively, with the ability to perform their soft, controlled, and broadly functional stop (cancellation), and the ability to write a task destructor in the body of the task itself.
In this way, a request to cancel tasks (and all nested subtasks and all internal critically important operations) can be handled in such a way that everything happens harmoniously and completely safely.
A cancellation request is made using a special token. A task cancellation token can be used synchronously (blocking) or asynchronously (via a subscription, which attaches a handler only for the duration of a critical and potentially very long operation).

Practical use #

Tasks are very lightweight objects. The actions performed by tasks are not much slower than those performed by futures.

A [Task] is an object representing some operation that will complete in the future.
Tasks are executed asynchronously and cooperatively.
Cooperative multitasking is a concurrency model where tasks voluntarily yield control (using await).

The result of a task execution is the result of computing the value of the task action. It can be either a value or an exception.
The task itself is an object of [Future] that wraps the result of the computation.
The main difference between the task and the [Future] is as follows:

The task does not begin executing the computation immediately after it is created #

The task supports delayed start. Or it may never even be started.
After the computation is completed, the task captures the result of the computation.

In case of completion with an exception, the task does not propagate this exception to the unhandled exception handler immediately #

This unobserved exception is stored in the relevant task object instance until the task is aware that an exception has been observed.
If the task isn not aware that an exception was observed, this exception will be propagated in the task finalizer ([Finalizer]).
If the finalizer is not executed by runtime (due to Dart SDK limitations), the exception will remain unobserved.
For this reason, due to the limited functionality of the finalizer, it is recommended to always observe task exceptions (detecting, catching, handling).

Exceptions in task can be observed in one of the following ways:

  • await task
  • task.future
  • Task.waitAll()
  • task.asStream() (inherited from [Future])
  • task.catchError() (inherited from [Future])
  • task.then() (inherited from [Future])
  • task.timeout() (inherited from [Future])
  • task.whenComplete() (inherited from [Future])

It all comes down to the fact that when accessing the [future] field of a task, an instance of the [Future] object is created and at that moment its life cycle begins.

Examples of the main features of the Task #

Task have features that extend, complement, or modify the functionality of futures.

Remark: All examples below were run during the creation of this document and contain actual output to the standard output streams (stdout and stderr).

The task do not throw exceptions at the completion point in case of unsuccessful completion #

Example with Future:

example/example_future.dart

import 'dart:async';

Future<void> main() async {
  final task = Future<int>(() => throw 'Error');

  print('Do some work');
  await Future<void>.delayed(Duration(seconds: 1));
  print('Work completed');

  try {
    final result = await task;
    print('Result: $result');
  } catch (e) {
    print(e);
  }
}

Output:

Do some work
Unhandled exception:
Error
#0      main.<anonymous closure> (file:///home/andrew/prj/multitasking/example/example_future.dart:4:34)
#1      new Future.<anonymous closure> (dart:async/future.dart:260:40)
#2      Timer._createTimer.<anonymous closure> (dart:async-patch/timer_patch.dart:18:15)
#3      _Timer._runTimers (dart:isolate-patch/timer_impl.dart:423:19)
#4      _Timer._handleMessage (dart:isolate-patch/timer_impl.dart:454:5)
#5      _RawReceivePort._handleMessage (dart:isolate-patch/isolate_patch.dart:193:12)

The same example with task:

example/example_task_await.dart

import 'dart:async';

import 'package:multitasking/multitasking.dart';

Future<void> main() async {
  final task = Task.run<int>(() => throw 'Error');

  print('Do some work');
  await Future<void>.delayed(Duration(seconds: 1));
  print('Work completed');

  try {
    final result = await task;
    print('Result: $result');
  } catch (e) {
    print(e);
  }
}

Output:

Do some work
Work completed
Error

A failed task does not affect the execution of other code (Do some work) if the task object instance is referenced.
The task will not throw an exception until the executing code accesses the future field (directly or indirectly, e.g. using await task).

If the executing code do not access the future field and there are no references to the task object instance, an exception will be thrown during garbage collection when the task is finalized.
Or it will never be thrown if the task finalization will not be performed (e.g. when the application terminates its work).

For the current task, it is possible to specify the onExit handler inside the task body #

The OnExit handler can be used to ensure the execution of some logical actions.

Example of OnExit handler:

example/example_task_on_exit.dart

import 'dart:async';

import 'package:multitasking/multitasking.dart';

Future<void> main() async {
  final task = Task.run<int>(() {
    Object? handle;

    Task.onExit((task) {
      print('$task exit with status: \'${task.state.name}\'');
      if (handle != null) {
        print('$task frees up: \'handle\'');
      }
    });

    handle = Object();
    throw 'Error';
  });

  print('Do some work');
  await Future<void>.delayed(Duration(seconds: 1));
  print('Work completed');

  try {
    final result = await task;
    print('Result: $result');
  } catch (e) {
    print(e);
  }
}

Output:

Task(0) exit with status: 'failed'
Task(0) frees up: 'handle'
Do some work
Work completed
Error

The name of the task can be specified #

Example of using a task with the name:

example/example_task_name.dart

import 'dart:async';

import 'package:multitasking/multitasking.dart';

Future<void> main() async {
  final task = Task.run<int>(name: 'my task', () {
    return 1;
  });

  print(task.name);
  await task;
}

Output:

my task

The task can be cancelled using a cancellation token #

Canceling a task is a normal action that is supported by the implementation of the mechanism of task functioning.
Canceling a task is safe for the task and the runtime. But that does not mean it is safe for the application.
For this reason, task cancellation is only performed in cases where the developer explicitly allows for cancellation.

There are different ways to handle task cancellation.

token.throwIfCancelled();
if (token.isCancelled) {
  // Handle cancellation
  throw TaskCanceledError();
}

The task can be cancelled during Task.sleep() #

All that is required for this is to pass the token as an argument to method Task.sleep().

Example of cancelling a task during task sleepTask.sleep().

example/example_task_cancel_during_sleep.dart

import 'dart:async';

import 'package:multitasking/multitasking.dart';

Future<void> main(List<String> args) async {
  final cts = CancellationTokenSource();
  final token = cts.token;

  var count = 0;
  final task = Task.run(() async {
    while (true) {
      count++;
      await Task.sleep(0, token);
    }
  });

  Timer(Duration(seconds: 1), cts.cancel);

  try {
    await task;
  } catch (e) {
    print(e);
  }

  _message('count: $count');
}

void _message(String text) {
  final task = Task.current.name ?? '${Task.current}';
  print('$task: $text');
}

Output:

TaskCanceledError
main(): count: 313482

Remark:
The terms parent task and child task are rather arbitrary, since there is no real relationship between these tasks.
They are used to simplify the logical understanding of the interaction of tasks.
The interaction logic is completely determined by the developer.

The task can be cancelled as a group of tasks #

Example of cancelled a group of tasks in case of any failure in any task.

example/example_task_cancel_group_by_failure.dart

import 'dart:async';

import 'package:multitasking/multitasking.dart';

Future<void> main() async {
  final cts = CancellationTokenSource();
  final token = cts.token;
  late AnyTask parent;
  final group = <Task<int>>[];

  void onExit(AnyTask task) {
    if (!task.isCompleted) {
      cts.cancel();
    }
  }

  parent = Task.run<void>(name: 'Parent', () async {
    Task.onExit((task) {
      print('On exit: $task (${task.state.name})');
      onExit(task);
    });

    for (var i = 1; i <= 3; i++) {
      final t = Task<int>(name: 'Child $i', () async {
        Task.onExit((task) {
          print('On exit: $task (${task.state.name})');
          onExit(task);
        });

        final n = i;
        var result = 0;
        for (var i = 0; i < 5; i++) {
          print('${Task.current} works: $i of 4');
          result++;

          token.throwIfCancelled();

          await Future<void>.delayed(Duration(seconds: 2));
          if (n == 1) {
            throw 'Failure in ${Task.current}';
          }
        }

        print('${Task.current} work done');
        return result;
      });

      group.add(t);
    }

    for (final task in group) {
      task.start();
      await Task.sleep();
    }

    await Task.waitAll(group);
  });

  try {
    await parent;
  } catch (e) {
    print(e);
  }
}

Output:

Task('Child 1', 1) works: 0 of 4
Task('Child 2', 2) works: 0 of 4
Task('Child 3', 3) works: 0 of 4
On exit: Task('Child 1', 1) (failed)
Task('Child 2', 2) works: 1 of 4
On exit: Task('Child 2', 2) (cancelled)
Task('Child 3', 3) works: 1 of 4
On exit: Task('Child 3', 3) (cancelled)
On exit: Task('Parent', 0) (failed)
One or more errors occurred. (Failure in Task('Child 1', 1)) (TaskCanceledError) (TaskCanceledError)

The task can be canceled while listening to the stream #

Example of canceling the emulation of the await for statement using ForEach class.

example/example_task_cancel_await_for_stream_emulation.dart

import 'dart:async';

import 'package:multitasking/extra/for_each.dart';
import 'package:multitasking/multitasking.dart';

Future<void> main(List<String> args) async {
  final controller = StreamController<int>.broadcast();

  final stream = controller.stream;
  final cts = CancellationTokenSource();
  final token = cts.token;

  var n = 0;
  Timer.periodic(Duration(seconds: 1), (timer) {
    print('Send event: $n');
    controller.add(n++);
    if (n > 5) {
      print('Stopping the controller');
      timer.cancel();
      controller.close();
    }
  });

  Timer(Duration(seconds: 3), () {
    _message('Cancellation requested');
    cts.cancel();
  });

  final tasks = <Task<int>>[];
  for (var i = 0; i < 3; i++) {
    final task = _doWork(stream, token, testBreak: i == 2);
    tasks.add(task);
  }

  try {
    await Task.waitAll(tasks);
  } catch (e) {
    print(e);
  }

  for (final task in tasks) {
    if (task.isCompleted) {
      final result = await task;
      _message('Result of ${task.toString()}: $result');
    }
  }
}

Task<int> _doWork(Stream<int> stream, CancellationToken token,
    {bool testBreak = false}) {
  return Task.run(() async {
    await Task.sleep();
    final list = <int>[];
    await ForEach(stream, token, (event) {
      _message('Received event: $event');
      list.add(event);
      if (list.length == 1 && testBreak) {
        _message('I want to break free...');
        return false;
      }

      return true;
    }).wait;
    token.throwIfCancelled();
    await Task.sleep();
    _message('Processing data: $list');
    if (testBreak) {
      return list.length;
    }

    await Future<void>.delayed(Duration(seconds: 1));
    return list.length;
  });
}

void _message(String text) {
  final task = Task.current.name ?? '${Task.current}';
  print('$task: $text');
}

Output:

Send event: 0
Task(0): Received event: 0
Task(1): Received event: 0
Task(2): Received event: 0
Task(2): I want to break free...
Task(2): Processing data: [0]
Send event: 1
Task(0): Received event: 1
Task(1): Received event: 1
Send event: 2
Task(0): Received event: 2
Task(1): Received event: 2
main(): Cancellation requested
One or more errors occurred. (TaskCanceledError) (TaskCanceledError)
main(): Result of Task(2): 1
Send event: 3
Send event: 4
Send event: 5
Stopping the controller

The group of tasks can be safely cancelled while working with the network #

An example of group of tasks cancellation while working with the network.

example/example_task_cancel_network.dart

import 'dart:async';

import 'package:http/http.dart' as http;
import 'package:multitasking/extra/for_each.dart';
import 'package:multitasking/multitasking.dart';

Future<void> main() async {
  final cts = CancellationTokenSource();
  final token = cts.token;
  final tasks = <Task<String>>[];
  final rss = <String>[
    'https://rss.nytimes.com/services/xml/rss/nyt/Sports.xml',
    'https://rss.nytimes.com/services/xml/rss/nyt/Science.xml',
    'https://rss.nytimes.com/services/xml/rss/nyt/Movies.xml',
    'https://rss.nytimes.com/services/xml/rss/nyt/Europe.xml',
    'https://rss.nytimes.com/services/xml/rss/nyt/Music.xml'
  ];
  for (var i = 0; i < rss.length; i++) {
    final task = Task.run(() async {
      final url = Uri.parse(rss[i]);
      final bytes = <int>[];
      print('Fetching feed: $url');
      token.throwIfCancelled();
      final client = http.Client();
      try {
        final request = http.Request('GET', url);
        final response = await client.send(request);
        final stream = response.stream;
        await ForEach(stream, token, (event) {
          bytes.addAll(event);
          return true;
        }).wait;
      } finally {
        print('Close client');
        client.close();
      }

      token.throwIfCancelled();
      final result = String.fromCharCodes(bytes);
      print('Processing feed: $url');
      await Future<void>.delayed(Duration(seconds: 1));
      return result;
    });

    tasks.add(task);
  }

  Timer(Duration(seconds: 4), () {
    _message('Cancelling');
    cts.cancel();
  });

  try {
    await Task.waitAll(tasks);
  } catch (e) {
    print(e);
  }

  for (final task in tasks) {
    print('-' * 40);
    print('${task.toString()}: ${task.state.name}');
    if (task.isCompleted) {
      final value = await task;
      final text = value;
      final length = text.length < 80 ? text.length : 80;
      print('Data ${text.substring(0, length)}');
    } else {
      print('No data');
    }
  }
}

void _message(String text) {
  final task = Task.current.name ?? '${Task.current}';
  print('$task: $text');
}

Output:

Fetching feed: https://rss.nytimes.com/services/xml/rss/nyt/Sports.xml
Fetching feed: https://rss.nytimes.com/services/xml/rss/nyt/Science.xml
Fetching feed: https://rss.nytimes.com/services/xml/rss/nyt/Movies.xml
Fetching feed: https://rss.nytimes.com/services/xml/rss/nyt/Europe.xml
Fetching feed: https://rss.nytimes.com/services/xml/rss/nyt/Music.xml
Close client
Processing feed: https://rss.nytimes.com/services/xml/rss/nyt/Sports.xml
Close client
Processing feed: https://rss.nytimes.com/services/xml/rss/nyt/Movies.xml
Close client
Processing feed: https://rss.nytimes.com/services/xml/rss/nyt/Science.xml
main(): Cancelling
Close client
Close client
One or more errors occurred. (TaskCanceledError) (TaskCanceledError)
----------------------------------------
Task(0): completed
Data <?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:dc="http://purl.org/dc/element
----------------------------------------
Task(1): completed
Data <?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:dc="http://purl.org/dc/element
----------------------------------------
Task(2): completed
Data <?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:dc="http://purl.org/dc/element
----------------------------------------
Task(3): cancelled
No data
----------------------------------------
Task(4): cancelled
No data

The tasks can be safely cancelled during long running network operation #

An example of task cancellation during long network operation.

example/example_task_cancel_long_network.dart

import 'dart:async';

import 'package:http/http.dart' as http;
import 'package:multitasking/extra/for_each.dart';
import 'package:multitasking/multitasking.dart';

Future<void> main() async {
  final cts = CancellationTokenSource();
  final token = cts.token;
  final list = [
    (
      '3.11.1',
      'https://storage.googleapis.com/dart-archive/channels/stable/release/3.11.1/sdk/dartsdk-windows-x64-release.zip'
    ),
    (
      '3.10.9',
      'https://storage.googleapis.com/dart-archive/channels/stable/release/3.10.9/sdk/dartsdk-windows-x64-release.zip'
    )
  ];

  final tasks = <AnyTask>[];
  for (final element in list) {
    final url = Uri.parse(element.$2);
    final filename = element.$1;
    final task = _download(url, filename, token);
    tasks.add(task);
  }

  // User request to cancel
  Timer(Duration(seconds: 2), cts.cancel);

  try {
    await Task.waitAll(tasks);
  } catch (e) {
    print('$e');
  }

  for (final task in tasks) {
    if (task.isCompleted) {
      final filename = await task;
      print('Done: $filename');
    }
  }
}

Task<String> _download(Uri url, String filename, CancellationToken token) {
  return Task.run(() async {
    final bytes = <int>[];
    token.throwIfCancelled();
    final client = http.Client();
    try {
      final request = http.Request('GET', url);
      final response = await client.send(request);
      final stream = response.stream;
      await ForEach(stream, token, (event) {
        bytes.addAll(event);
        return true;
      }).wait;
    } finally {
      print('Close client');
      client.close();
      _message('Downloaded: ${bytes.length}');
    }

    token.throwIfCancelled();
    // Save file to disk
    await Future<void>.delayed(Duration(seconds: 1));
    return filename;
  });
}

void _message(String text) {
  final task = Task.current.name ?? '${Task.current}';
  print('$task: $text');
}

Output:

Close client
Task(1): Downloaded: 675990
Close client
Task(0): Downloaded: 611845
One or more errors occurred. (TaskCanceledError) (TaskCanceledError)

Synchronization primitives #

Synchronization primitives are mechanisms that synchronize the execution of multiple operations by locking their execution and putting them into a waiting state.
In essence, these mechanisms imply either waiting for acquire the permit, followed by release this permit, or waiting for a signal without acquiring the permit. Or even waiting for a signal followed by acquiring the permit.

Synchronization primitives do not require the use of tasks, they work with zones (Zone) and can be used in any applications.

Counting semaphore #

A counting semaphore is a synchronization primitive that maintains a counter that represents the number of available permits.
Acquire:
If the counter is 0, the execution of the calling code is blocked until the count becomes greater than 0.
Otherwise, the counter is decremented, and the calling code acquires a permit.
Release:
If any calling code was blocked from executing, that code will continue executing and acquire the permit.
Otherwise, the counter is incremented.

Example with a limit of no more than 3 simultaneously executed operations.

example/example_counting_semaphore.dart

import 'package:multitasking/multitasking.dart';
import 'package:multitasking/synchronization/counting_semaphore.dart';

Future<void> main(List<String> args) async {
  final sem = CountingSemaphore(0, 3);
  final tasks = <AnyTask>[];
  _message('Round with asynchronous entry in the task body');
  for (var i = 0; i < 7; i++) {
    final task = Task.run(name: 'task $i', () async {
      // Asynchronous entry
      await Task.sleep();
      _message('acquire');
      await sem.acquire();
      try {
        _message('  acquired');
        await Task.sleep();
      } finally {
        _message('release');
        await sem.release();
      }
    });

    tasks.add(task);
  }

  try {
    await Task.waitAll(tasks);
  } catch (e) {
    print(e);
  }

  tasks.clear();
  print('-' * 40);
  _message('Round with synchronous entry in the task body');
  for (var i = 0; i < 7; i++) {
    final task = Task.run(name: 'task $i', () async {
      // Synchronous entry
      // await Task.sleep();
      _message('acquire');
      await sem.acquire();
      try {
        _message('  acquired');

        await Task.sleep();
      } finally {
        _message('release');
        await sem.release();
      }
    });

    tasks.add(task);
  }

  try {
    await Task.waitAll(tasks);
  } catch (e) {
    print(e);
  }
}

void _message(String text) {
  final task = Task.current.name ?? '${Task.current}';
  print('$task: $text');
}

Output:

main(): Round with asynchronous entry in the task body
task 0: acquire
task 0:   acquired
task 1: acquire
task 1:   acquired
task 2: acquire
task 2:   acquired
task 3: acquire
task 4: acquire
task 5: acquire
task 6: acquire
task 0: release
task 3:   acquired
task 1: release
task 4:   acquired
task 2: release
task 5:   acquired
task 3: release
task 6:   acquired
task 4: release
task 5: release
task 6: release
----------------------------------------
main(): Round with synchronous entry in the task body
task 0: acquire
task 1: acquire
task 2: acquire
task 3: acquire
task 4: acquire
task 5: acquire
task 6: acquire
task 0:   acquired
task 1:   acquired
task 2:   acquired
task 0: release
task 3:   acquired
task 1: release
task 4:   acquired
task 2: release
task 5:   acquired
task 3: release
task 6:   acquired
task 4: release
task 5: release
task 6: release

Binary semaphore #

A [BinarySemaphore] is a synchronization primitive with an integer value restricted to 0 or 1, representing locked (0) or unlocked (1) states.

Unlike a mutex, a semaphore is a counting-based synchronizer.
If a semaphore is locked, it will be locked even for the current zone.

If a mutex is owned by a zone, it will not block this zone. It will count the number of times it is entered and leaved by zone before releasing.

An example of using a binary semaphore as a locking mechanism:

example/example_binary_semaphore.dart

import 'package:multitasking/multitasking.dart';
import 'package:multitasking/synchronization/binary_semaphore.dart';

Future<void> main(List<String> args) async {
  final sem = BinarySemaphore();
  final tasks = <AnyTask>[];

  for (var i = 0; i < 5; i++) {
    final task = Task.run(name: 'task $i', () async {
      await Task.sleep();
      _message('acquire');
      await sem.acquire();
      try {
        _message('  acquired');
        await Task.sleep();
      } finally {
        _message('release');
        await sem.release();
      }
    });

    tasks.add(task);
  }

  try {
    await Task.waitAll(tasks);
  } catch (e) {
    print(e);
  }
}

void _message(String text) {
  final task = Task.current.name ?? '${Task.current}';
  print('$task: $text');
}

Output:

task 0: acquire
task 0:   acquired
task 1: acquire
task 2: acquire
task 3: acquire
task 4: acquire
task 0: release
task 1:   acquired
task 1: release
task 2:   acquired
task 2: release
task 3:   acquired
task 3: release
task 4:   acquired
task 4: release

Condition variable #

A ConditionVariable is a synchronization primitive that allows to wait for a particular condition to become true before proceeding.
It is always used in conjunction with a locking to safely manage access to the shared data and prevent race conditions.

An example of using two condition variables in conjunction with a binary semaphore (as a synchronization mechanism):

example/example_condition_variable.dart

import 'dart:collection';

import 'package:multitasking/multitasking.dart';
import 'package:multitasking/synchronization/binary_semaphore.dart';
import 'package:multitasking/synchronization/condition_variable.dart';

Future<void> main(List<String> args) async {
  final lock = BinarySemaphore();
  final notEmpty = ConditionVariable(lock);
  final notFull = ConditionVariable(lock);
  const capacity = 4;
  final products = Queue<int>();
  var productId = 0;
  var produced = 0;
  var consumed = 0;
  const count = 3;

  final producer = Task.run(name: 'producer', () async {
    for (var i = 0; i < count; i++) {
      await Future<void>.delayed(Duration(milliseconds: 50));
      final product = productId++;
      produced++;
      _message('produced: $product');
      _message('lock.acquire()');
      await lock.acquire();
      _message('lock.acquired');
      try {
        while (products.length == capacity) {
          _message('notFull.wait()');
          await notFull.wait();
        }

        _message('added product: $product');
        products.add(product);
        _message('products: $products');
        _message('notEmpty.notifyAll()');
        await notEmpty.notifyAll();
      } finally {
        _message('lock.release()');
        await lock.release();
      }
    }
  });

  final consumer = Task.run(name: 'consumer', () async {
    for (var i = 0; i < count; i++) {
      int? product;
      _message('lock.acquire()');
      await lock.acquire();
      _message('lock.acquired');
      try {
        while (products.isEmpty) {
          _message('notEmpty.wait()');
          await notEmpty.wait();
        }

        product = products.removeFirst();
        _message('removed product: $product');
        _message('products: $products');
        _message('notFull.notifyAll()');
        await notFull.notifyAll();
      } finally {
        _message('lock.release()');
        await lock.release();
      }

      await Future<void>.delayed(Duration(milliseconds: 200));
      _message('consumed product: $product');
      consumed++;
    }
  });

  await Task.waitAll([consumer, producer]);

  _message('produced: $produced');
  _message('consumed: $consumed');
}

void _message(String text) {
  final task = Task.current.name ?? '${Task.current}';
  print('$task: $text');
}

Output:

consumer: lock.acquire()
consumer: lock.acquired
consumer: notEmpty.wait()
producer: produced: 0
producer: lock.acquire()
producer: lock.acquired
producer: added product: 0
producer: products: {0}
producer: notEmpty.notifyAll()
producer: lock.release()
consumer: removed product: 0
consumer: products: {}
consumer: notFull.notifyAll()
consumer: lock.release()
producer: produced: 1
producer: lock.acquire()
producer: lock.acquired
producer: added product: 1
producer: products: {1}
producer: notEmpty.notifyAll()
producer: lock.release()
producer: produced: 2
producer: lock.acquire()
producer: lock.acquired
producer: added product: 2
producer: products: {1, 2}
producer: notEmpty.notifyAll()
producer: lock.release()
consumer: consumed product: 0
consumer: lock.acquire()
consumer: lock.acquired
consumer: removed product: 1
consumer: products: {2}
consumer: notFull.notifyAll()
consumer: lock.release()
consumer: consumed product: 1
consumer: lock.acquire()
consumer: lock.acquired
consumer: removed product: 2
consumer: products: {}
consumer: notFull.notifyAll()
consumer: lock.release()
consumer: consumed product: 2
main(): produced: 3
main(): consumed: 3

Reentrant lock #

A ReentrantLock is a synchronization primitive that works like a mutex.
It blocks execution of all zones that do not own this lock.
The zone that acquired the permit becomes the owner of this lock.
The zone owner can enter and exit as long as it holds this lock.

An example of reentering a ReentrantLock.

example/example_reentrant_lock.dart

import 'package:multitasking/multitasking.dart';
import 'package:multitasking/synchronization/reentrant_lock.dart';

Future<void> main(List<String> args) async {
  final lock = ReentrantLock();
  var count = 0;

  Future<void> func(int i) async {
    await lock.acquire();
    try {
      await Future<void>.delayed(Duration(milliseconds: 50));
      count++;
      _message('Increment counter: $count');
      if (i + 1 < 3) {
        await func(i + 1);
      }
    } finally {
      await lock.release();
    }
  }

  final tasks = <AnyTask>[];
  for (var i = 0; i < 3; i++) {
    final t = Task.run(() => func(0));
    tasks.add(t);
  }

  await Task.waitAll(tasks);
}

void _message(String text) {
  final task = Task.current.name ?? '${Task.current}';
  print('$task: $text');
}

Output:

Task(0): Increment counter: 1
Task(0): Increment counter: 2
Task(0): Increment counter: 3
Task(1): Increment counter: 4
Task(1): Increment counter: 5
Task(1): Increment counter: 6
Task(2): Increment counter: 7
Task(2): Increment counter: 8
Task(2): Increment counter: 9

Lock interface #

Lock is an interface that simplifies the use of locking primitives.
For example, this interface is implemented by the classes BinarySemaphore and ReentrantLock.
These classes can be used for exclusive locking.

An example of using a binary semaphore as a locking mechanism.

example/example_lock.dart

import 'package:multitasking/multitasking.dart';
import 'package:multitasking/synchronization/binary_semaphore.dart';

Future<void> main(List<String> args) async {
  final sem = BinarySemaphore();
  final tasks = <AnyTask>[];
  for (var i = 0; i < 3; i++) {
    final task = Task.run(() async {
      await sem.lock(() async {
        _message('Enter');
        await Future<void>.delayed(Duration(milliseconds: 100));
        _message('Leave');
      });
    });

    tasks.add(task);
  }

  await Task.waitAll(tasks);
}

void _message(String text) {
  final task = Task.current.name ?? '${Task.current}';
  print('$task: $text');
}

Output:

Task(0): Enter
Task(0): Leave
Task(1): Enter
Task(1): Leave
Task(2): Enter
Task(2): Leave

Multiple write single read object #

A MultipleWriteSingleReadObjectis a synchronized object.

If an object is not held by one or more writers, then readers can access the value of and object (using the [read] method) without any delay, having previously checked the state of the object by reading the value [isLocked].

If a object is held by one or more writers, then readers must waiting for the write operations to complete using the [wait] method.
After that, a value can be accessed immediately using the [read] method.

If an object is held by one or more readers and a write operation is requested, the writer will wait for all previous read and write operations.

An example of reading and writing a shared object simultaneously.

example/example_multiple_write_single_read_object.dart

import 'package:multitasking/multitasking.dart';
import 'package:multitasking/synchronization/multiple_write_single_read_object.dart';

Future<void> main(List<String> args) async {
  final object = MultipleWriteSingleReadObject(0);
  final tasks = <AnyTask>[];

  void scheduleTask(int ms, Future<void> Function() action) {
    final t = Task.run<void>(() async {
      await Task.sleep(ms);
      await action();
    });
    tasks.add(t);
  }

  void scheduleRead(int ms) {
    scheduleTask(ms, () async {
      final int v;
      var isLocked = false;
      if (object.isLocked) {
        isLocked = true;
        _message('wait read');
        await object.wait();
      }

      final mode = isLocked ? 'read (after wait)' : 'read';
      v = object.read();
      _message('$mode $v');
    });
  }

  void scheduleWrite(int ms) {
    scheduleTask(ms, () {
      _message('wait write');
      return object.write((value) async {
        await Future<void>.delayed(Duration(milliseconds: 100));
        final v = ++value;
        _message('write $v');
        return v;
      });
    });
  }

  scheduleRead(0);
  scheduleWrite(0);
  scheduleWrite(0);
  scheduleRead(0);
  scheduleRead(200);
  scheduleRead(400);

  await Task.waitAll(tasks);
}

void _message(String text) {
  final task = Task.current.name ?? '${Task.current}';
  print('$task: $text');
}

Output:

Task(0): read 0
Task(1): wait write
Task(2): wait write
Task(3): wait read
Task(1): write 1
Task(4): wait read
Task(2): write 2
Task(3): read (after wait) 2
Task(4): read (after wait) 2
Task(5): read 2

0
likes
160
points
336
downloads

Publisher

unverified uploader

Weekly Downloads

Cooperative multitasking using asynchronous tasks and synchronization primitives, with the ability to safely cancel groups of nested tasks performing I/O wait or listen operations.

Repository (GitHub)
View/report issues

Topics

#parallelism #concurrency #cancellation #synchronization #mutex

Documentation

API reference

License

BSD-3-Clause (license)

Dependencies

async, meta, stack_trace

More

Packages that depend on multitasking