executorservices 2.0.3

  • Readme
  • Changelog
  • Example
  • Installing
  • 80

Pub.dev License

A library for Dart and Flutter developers.

license.

Description #

It allows you to execute code in isolates or any executor currently supported.

Support concurrent execution of tasks or functions.

Support cleanup of unused isolates.

Support caching of isolate that allow you to reuse them (like thread).

It's extendable.

Usage #

For tasks that return either a value or an error: #

For submitting a top level or static function without arguments to the executorservice. For task that return a value or a future the executorService.submit* return a future, to track the progress of the task.

import "dart:math";
import "package:executorservices/executorservices.dart";

void main() {
  final executorService = ExecutorService.newSingleExecutor();

  final Future<int> futureResult = executorService.submitAction(_randomInt);

  futureResult.then((result) => print(result));
}

int _randomInt() {
  return Random.secure().nextInt(1000);
}

For submitting a top level or static function with one argument to the executorservice.

import "package:executorservices/executorservices.dart";

void main() {
  final executorService = ExecutorService.newSingleExecutor();

  final Future<String> futureResult = executorService.submitCallable(expensiveHelloWorld, "Darel Bitsy");

  futureResult.then((result) => print(result));
}

Future<String> expensiveHelloWorld(final String name) async {
  await Future.delayed(Duration(seconds: 1));
  return "Hellow world $name";
}

For submitting a top level or static function with many other arguments to the executorservice. The executorservice provide method like submitFunction2,submitFunction3,submitFunction4.

import "package:executorservices/executorservices.dart";

void main() {
  final executorService = ExecutorService.newSingleExecutor();

  final Future<String> futureResult = executorService.submitFunction2(
    expensiveHelloWorld,
    "Darel Bitsy",
    23,
  );

  futureResult.then((result) => print(result));
}

Future<String> expensiveHelloWorld(final String name, final int age) async {
  await Future.delayed(Duration(seconds: 1));
  if (age >= 80) {
    return "Hellow world elder $name";
  } else {
    return "Hellow world $name";
  }
}

If you want to run a instance method of a existing class you need to make it extend the Task class. If if you want to have more than five arguments function. If you don't want to have the code as top level or static function.

import "dart:async";
import "dart:isolate";

import "package:executorservices/executorservices.dart";
import "package:http/http.dart" as http;

void main() {
  final executorService = ExecutorService.newSingleExecutor();

  final Future<String> futureResult = executorService.submit(
    GetPost("https://jsonplaceholder.typicode.com/posts", 5),
  );

  futureResult.then(
    (result) => print(
      "Recieved $result and it's running from ${Isolate.current.debugName}",
    ),
  );
}

class GetPost extends Task<String> {
  GetPost(this.apiUrl, this.index);

  final String apiUrl;
  final int index;

  @override
  FutureOr<String> execute() {
    return http
        .get("$apiUrl/$index")
        .then((postResponse) => postResponse.body)
        .then(
      (json) {
        print(
          "about to return $json and it's running from ${Isolate.current.debugName}",
        );
        return json;
      },
    );
  }
}

A example of chaining tasks.

import "dart:async";
import "dart:math";

import "package:executorservices/executorservices.dart";
import "package:http/http.dart" as http;

void main() {
  final executorService = ExecutorService.newSingleExecutor();

  executorService
      .submitFunction2(_fullName, "Darel", "Bitsy")
      .then(
        (fullName) {
          print("Hi, $fullName");
          return executorService.submitAction(randomPostId);
        },
      )
      .then(
        (postId) => executorService.submit(
          GetPost("https://jsonplaceholder.typicode.com/posts", postId),
        ),
      )
      .then(print);
}

int randomPostId() {
  return Random.secure().nextInt(10);
}

Future<String> _fullName(final String firstName, final String lastName) async {
  await Future.delayed(Duration(milliseconds: 500));
  return "$firstName, $lastName";
}

class GetPost extends Task<String> {
  GetPost(this.apiUrl, this.index);

  final String apiUrl;
  final int index;

  @override
  FutureOr<String> execute() {
    return http.get("$apiUrl/$index").then((postResponse) => postResponse.body);
  }
}

For tasks that emit events (Code that return a stream): #

For executing to a top level or static function without arguments that return a stream to the executorservice.

import "dart:async";
import "dart:isolate";
import "dart:math";

import "package:executorservices/executorservices.dart";

void main() {
  final executorService = ExecutorService.newSingleExecutor();

  final Stream<int> streamOfRandomNumber =
      executorService.subscribeToAction(randomGenerator);

  streamOfRandomNumber.listen(
    (newData) => print(
      "Received $newData and it's running on isolate: ${Isolate.current.debugName}",
    ),
    onError: (error) => print(
      "Received ${error.toString()} and it's running on isolate: ${Isolate.current.debugName}",
    ),
    onDone: () => print(
      "Task's done and it's running on isolate: ${Isolate.current.debugName}",
    ),
  );
}

Stream<int> randomGenerator() async* {
  for (var i = 0; i < Random.secure().nextInt(10); i++) {
    await Future.delayed(Duration(seconds: 1));
    print(
      "about to yield $i and it's running from ${Isolate.current.debugName}",
    );
    yield i;
  }
}

Here are the analogical method for other type of top level, or static functions: subscribeToCallable, subscribeToFunction2, subscribeToFunction3, subscribeToFunction4

If you want to run a instance method of a existing class you need to make it extend the SubscribableTask class. If if you want to have more than five arguments function. If you don't want to have the code as top level or static function.

import "dart:async";
import "dart:isolate";

import "package:executorservices/executorservices.dart";
import "package:http/http.dart" as http;

void main() {
  final executorService = ExecutorService.newSingleExecutor();

  final Stream<String> streamOfPosts = executorService.subscribe(
    GetPosts("https://jsonplaceholder.typicode.com/posts", 10),
  );

  streamOfPosts.listen(
    (newData) => print(
      "Received $newData and it's running on isolate: ${Isolate.current.debugName}",
    ),
    onError: (error) => print(
      "Received ${error.toString()} and it's running on isolate: ${Isolate.current.debugName}",
    ),
    onDone: () => print(
      "Task's done and it's running on isolate: ${Isolate.current.debugName}",
    ),
  );
}

class GetPosts extends SubscribableTask<String> {
  GetPosts(this.apiUrl, this.maxPosts);

  final String apiUrl;
  final int maxPosts;

  @override
  Stream<String> execute() async* {
    for (var index = 0; index < maxPosts; index++) {
      final postJson = await http
          .get("$apiUrl/$index")
          .then((postResponse) => postResponse.body);

      print(
        "about to yield $postJson and "
        "it's running from ${Isolate.current.debugName}",
      );

      yield postJson;
    }
  }
}

A example of chaining tasks of different type together.

import "dart:async";
import "dart:io";

import "package:executorservices/executorservices.dart";
import "package:http/http.dart" as http;

void main() {
  final executorService = ExecutorService.newUnboundExecutor();

  executorService
      .subscribeToAction(randomPostIds)
      .asyncMap((randomPostId) => executorService.submit(GetPost(randomPostId)))
      .listen(
        (newData) => print("Received $newData"),
        onError: (error) => print("Received ${error.toString()}"),
        onDone: () {
          print("Task's done");
          exit(0);
        },
      );
}

Stream<int> randomPostIds() async* {
  for (var index = 0; index < 10; index++) {
    await Future.delayed(Duration(milliseconds: 100));
    yield index;
  }
}

class GetPost extends Task<String> {
  GetPost(this.postId);

  final int postId;

  @override
  Future<String> execute() {
    return http
        .get("https://jsonplaceholder.typicode.com/posts/$postId")
        .then((postResponse) => postResponse.body);
  }
}

By default you can't re-submit the same instance of a ongoing Task to a ExecutorService multiple times. Because the result of your submitted task is associated with the task instance identifier. So by default submitting the same instance of a task multiple times will result in unexpected behaviors.

For example, this won't work:

main() {
  final executors = ExecutorService.newUnboundExecutor();
  
  final task = SameInstanceTask();

  executors.submit(task);
  executors.submit(task);
  executors.submit(task);
  
  for (var index = 0; index < 10; index++) {
    executors.submit(task);
  }
}

class SameInstanceTask extends Task<String> {
  @override
  FutureOr<String> execute() async {
    await Future.delayed(Duration(seconds: 5));
    return "Done executing same instance task";
  }
} 

But if you want to submit the same instance of a task multiple times you need to override the Task is clone method.

For example, this will now work:

main() {
  final executors = ExecutorService.newUnboundExecutor();
  
  final task = SameInstanceTask();

  for (var index = 0; index < 10; index++) {
    executors.submit(task);
  }
  
  final taskWithParams = SameInstanceTaskWithParams("Darel Bitsy");

  for (var index = 0; index < 10; index++) {
    executors.submit(taskWithParams);
  }
}

class SameInstanceTask extends Task<String> {
  @override
  FutureOr<String> execute() async {
    await Future.delayed(Duration(minutes: 5));
    return "Done executing same instance task";
  }
  
  @override
  SameInstanceTask clone() {
    return SameInstanceTask();
  }
}

class SameInstanceTaskWithParams extends Task<String> {
  SameInstanceTaskWithParams(this.name);

  final String name;

  @override
  FutureOr<String> execute() async {
    await Future.delayed(Duration(minutes: 5));
    return "Done executing same instance task with name: $name";
  }
    
  @override
  SameInstanceTaskWithParams clone() {
    return SameInstanceTaskWithParams(name);
  }
}

Features and bugs #

Please file feature requests and bugs at the issue tracker.

1.0.0 #

  • Initial version, with support for Isolate executor service.

1.0.1 #

  • Updated the documentation.

1.0.2 #

  • Added support for functions that does not return a future.

1.0.3 #

  • Added support for functions that does not return a future on WEB also.

1.0.4 #

  • Updated the readme to demonstrate the new feature, updated the task classes to support to return FutoreOr.

1.0.5 #

  • Added shutdown for last oldest unused isolate to free memory gradually.

1.0.6 #

  • Added possibility to re-submit the same instance of a task to an ExecutorService.

2.0.0 #

  • Added subscribable task feature.

  • Subscribable tasks can emit many values before completing.

  • Subscribable task's result is a stream, which mean that you can pause, resume and cancel it.

2.0.0+1 #

  • Added example for subscribable tasks.

2.0.1 #

  • Changed LICENSE from GPL 3 to BSD-3 to allow more adoption.

2.0.2 #

  • Cleaner documentation for each methods provided by the library

2.0.2+1 #

  • Removed dependency on meta 1.1.8 to allow integration in old flutter projects.

2.0.3 #

  • Delegated SubscribableTask pause, resume, cancel hooks to the task manager.
  • Add ability to cancel a SubscribableTask before execution.

example/example.dart

import "dart:async";
import "dart:math";

import "package:executorservices/executorservices.dart";
import "package:http/http.dart" as http;

import "utils.dart";

void main() {
  final executorService = ExecutorService.newUnboundExecutor();

  executorService
      .submit(
        GetJsonFromUrlTask("https://jsonplaceholder.typicode.com/posts/1"),
      )
      .then((data) => printRunningIsolate("GetJsonFromUrlTask:result\n$data"));

  executorService
      .submitAction(onShotFunction)
      .then((_) => printRunningIsolate("onShotFunction:done"));

  executorService.submitCallable(getRandomNumber, 10).then(
        (number) => printRunningIsolate(
          "getRandomNumber:result:$number",
        ),
      );

  executorService.submitCallable(getRandomNumberSync, 1000000).then(
        (number) => printRunningIsolate(
          "getRandomNumberSync:result:$number",
        ),
      );

  executorService
      .submitFunction2(getFullName, "Darel", "Bitsy")
      .then((result) => printRunningIsolate("getFullName:result:$result"));

  executorService
      .submitFunction3(greet, "Darel", "Bitsy", "bdeg")
      .then((result) => printRunningIsolate("greet:result:$result"));

  executorService.subscribeToCallable(getPosts, 10).listen(
        (number) => print("event received: $number"),
        onError: (error) => print("error received $error"),
        onDone: () => print("task is done"),
      );

  executorService
      .subscribeToCallable(getPosts, 10)
      .asyncMap(
        (post) => executorService.submitCallable(getRandomNumber, post.length),
      )
      .asyncMap(
        (number) =>
            executorService.submitCallable(getRandomNumberSync, number + 1),
      )
      .listen(
        (number) => print("event received: $number"),
        onError: (error) => print("error received $error"),
        onDone: () => print("task is done"),
      );
}

Stream<String> getPosts(final int max) async* {
  for (var index = 0; index < max; index++) {
    final post = await http.get(
      "https://jsonplaceholder.typicode.com/posts/$index",
    );

    yield post.body;
  }
}

void onShotFunction() async {
  printRunningIsolate("onShotFunction:enter");
  await Future.delayed(Duration(seconds: 3));
}

Future<int> getRandomNumber(final int max) async {
  printRunningIsolate("getRandomNumber:enter");
  await Future.delayed(Duration(seconds: 2));
  return Random.secure().nextInt(max);
}

int getRandomNumberSync(final int max) {
  printRunningIsolate("getRandomNumber:enter");
  return Random.secure().nextInt(max);
}

Future<String> getFullName(final String firstName, final String lastName) {
  printRunningIsolate("getRandomNumber");
  return Future.delayed(Duration(seconds: 1))
      .then((_) => "$firstName $lastName");
}

Future<String> greet(
  final String firstName,
  final String lastName,
  final String surname,
) {
  printRunningIsolate("greet");
  return Future.delayed(Duration(seconds: 2))
      .then((_) => "Hello $firstName $lastName $surname");
}

class GetJsonFromUrlTask extends Task<String> {
  GetJsonFromUrlTask(this.url);

  final String url;

  @override
  FutureOr<String> execute() {
    return http.get(url).then((response) {
      printRunningIsolate(url);
      return response.body;
    });
  }
}

Use this package as a library

1. Depend on it

Add this to your package's pubspec.yaml file:


dependencies:
  executorservices: ^2.0.3

2. Install it

You can install packages from the command line:

with pub:


$ pub get

with Flutter:


$ flutter pub get

Alternatively, your editor might support pub get or flutter pub get. Check the docs for your editor to learn more.

3. Import it

Now in your Dart code, you can use:


import 'package:executorservices/executorservices.dart';
  
Popularity:
Describes how popular the package is relative to other packages. [more]
61
Health:
Code health derived from static analysis. [more]
100
Maintenance:
Reflects how tidy and up-to-date the package is. [more]
100
Overall:
Weighted score of the above. [more]
80
Learn more about scoring.

We analyzed this package on Jan 19, 2020, and provided a score, details, and suggestions below. Analysis was completed with status completed using:

  • Dart: 2.7.0
  • pana: 0.13.4

Dependencies

Package Constraint Resolved Available
Direct dependencies
Dart SDK >=2.4.0 <3.0.0
meta ^1.1.7 1.1.8
Dev dependencies
flutter_code_style ^1.6.5
mockito ^4.1.1
test ^1.9.3