isolate_workgroup 1.0.0 copy "isolate_workgroup: ^1.0.0" to clipboard
isolate_workgroup: ^1.0.0 copied to clipboard

A Dart package for managing a dynamic group of isolates — supports one-off job dispatch, persistent members, health monitoring, and runtime workgroup resizing.

example/example.dart

// Two minimal samples for isolate_workgroup.
//
//   Sample 1 — dispatch(): parallel one-off jobs.
//              Splits a big numeric range into chunks, computes a partial
//              sum in each worker, and reduces the partials in the main
//              isolate. Demonstrates fire-and-forget jobs whose result
//              you await.
//
//   Sample 2 — addInstance() + invoke(): stateful members.
//              Spawns a `Counter` member in each worker and drives it
//              with a small WorkerCommand vocabulary. Demonstrates
//              persistent state living inside a worker across many calls.

import 'package:isolate_workgroup/isolate_workgroup.dart';

void main(List<String> arguments) async {
  final workgroup = IsolateWorkgroup(4);
  await workgroup.launch();

  try {
    await sample1ParallelReduce(workgroup);
    await sample2StatefulCounters(workgroup);
  } finally {
    workgroup.shutdown();
  }
}

// ─── Sample 1: parallel reduce via dispatch() ────────────────────────────────

Future<void> sample1ParallelReduce(IsolateWorkgroup workgroup) async {
  print('\n=== Sample 1: parallel reduce ===');

  // Sum 1..N split across 4 chunks, reduced in main.
  const total = 1000000;
  const chunks = 4;
  const chunkSize = total ~/ chunks;

  final futures = <Future<ChunkStats>>[
    for (var i = 0; i < chunks; i++)
      workgroup.dispatch(
        SumChunkJob(start: i * chunkSize + 1, end: (i + 1) * chunkSize),
      ),
  ];

  final partials = await Future.wait(futures);
  final sum = partials.fold<int>(0, (acc, p) => acc + p.sum);
  final count = partials.fold<int>(0, (acc, p) => acc + p.count);
  print('Sum 1..$total = $sum  (count=$count, mean=${sum / count})');
}

/// A small sendable record returned from a chunk.
class ChunkStats {
  final int sum;
  final int count;
  const ChunkStats(this.sum, this.count);
}

/// Pure-data job: only sendable fields, executes in a worker.
class SumChunkJob extends WorkgroupJob<ChunkStats> {
  final int start;
  final int end;
  const SumChunkJob({required this.start, required this.end});

  @override
  Future<ChunkStats> execute() async {
    var s = 0;
    for (var i = start; i <= end; i++) {
      s += i;
    }
    return ChunkStats(s, end - start + 1);
  }
}

// ─── Sample 2: stateful Counter members via addInstance() ────────────────────

Future<void> sample2StatefulCounters(IsolateWorkgroup workgroup) async {
  print('\n=== Sample 2: stateful Counters ===');

  // One Counter per worker. Each holds independent state in its isolate.
  final counters = <MemberProxy>[
    for (var i = 0; i < 4; i++)
      await workgroup.addInstance(Counter(), isolateIndex: i),
  ];

  // Seed each counter with a different starting amount.
  for (var i = 0; i < counters.length; i++) {
    await counters[i].invoke(IncrementBy((i + 1) * 10));
  }

  for (var i = 0; i < counters.length; i++) {
    final value = await counters[i].invoke<int>(GetValue());
    print('Worker $i counter starts at $value');
  }

  // Drive counter 0 through 100 increments to show state persists across calls.
  for (var i = 0; i < 100; i++) {
    await counters[0].invoke(IncrementBy(1));
  }
  final finalValue = await counters[0].invoke<int>(GetValue());
  print('Worker 0 after +100 increments: $finalValue');

  // Reset + verify.
  await counters[0].invoke(Reset());
  print('Worker 0 after reset: ${await counters[0].invoke<int>(GetValue())}');

  // Always destroy members you added.
  for (final c in counters) {
    workgroup.destroyInstance(c);
  }
}

// Commands the Counter understands. WorkerCommand fields must be sendable.
class IncrementBy extends WorkerCommand {
  final int amount;
  IncrementBy(this.amount);
}

class GetValue extends WorkerCommand {}

class Reset extends WorkerCommand {}

/// Persistent state that lives inside a worker isolate. The `Counter`
/// instance is sent to the worker once on addInstance(); subsequent
/// invoke() calls run handle() inside the worker.
class Counter extends WorkgroupMember {
  int _value = 0;

  @override
  Future<void> setup() async {
    // Initialize resources here if needed.
  }

  @override
  Future<dynamic> handle(WorkerCommand command) async {
    switch (command) {
      case IncrementBy():
        _value += command.amount;
        return null;
      case GetValue():
        return _value;
      case Reset():
        _value = 0;
        return null;
      default:
        throw ArgumentError('Unknown command: ${command.runtimeType}');
    }
  }
}
0
likes
160
points
90
downloads

Documentation

API reference

Publisher

verified publisherjustbrendan.me

Weekly Downloads

A Dart package for managing a dynamic group of isolates — supports one-off job dispatch, persistent members, health monitoring, and runtime workgroup resizing.

Repository (GitHub)
View/report issues
Contributing

Topics

#isolate #concurrency #parallel #worker-pool #compute

License

BSD-3-Clause, MIT (license)

Dependencies

meta

More

Packages that depend on isolate_workgroup