isolate_stream library

Support for running a generator function in a separate Isolate, with flow control.

Sample usage, with FizzBuzz as a stand-in for a computationally intensive series:

import 'dart:async';
import 'package:convert/convert.dart';
import 'package:intl/intl.dart';
import 'package:jovial_misc/io_utils.dart';
import 'package:jovial_misc/isolate_stream.dart';

///
/// Example of using [IsolateStream] to run a computationally-intensive
/// generator function in an isolate.  We use FizzBuzz as a
/// stand-in for a computationally intensive series of values.
///
Future<void> isolate_stream_example() async {
  const max = 25;
  final fmt = NumberFormat();
  const iterationPause = Duration(milliseconds: 250);
  print('Generating FizzBuzz sequence up to ${fmt.format(max)}');

  final stream = IsolateStream<String>(FizzBuzzGenerator(max));
  // Our stream will be limited to 11 strings in the buffer at a time.
  for (var iter = StreamIterator(stream); await iter.moveNext();) {
    print(iter.current);
    await Future<void>.delayed(iterationPause);
  }
  // Note that the producer doesn't run too far ahead of the consumer,
  // because the buffer is limited to 30 strings.
}

/// The generator that runs in a separate isolate.
class FizzBuzzGenerator extends IsolateStreamGenerator<String> {
  final int _max;

  FizzBuzzGenerator(this._max) {
    print('FizzBuzzGenerator constructor.  Note that this only runs once.');
    // This demonstrats that when FizzBuzzGenerator is sent to the other
    // isolate, the receiving isolate does not run the constructor.
  }

  @override
  Future<void> generate() async {
    for (var i = 1; i <= _max; i++) {
      var result = '';
      if (i % 3 == 0) {
        result = 'Fizz';
      }
      if (i % 5 == 0) {
        result += 'Buzz';
      }
      print('        Generator sending $i $result');
      if (result == '') {
        await sendValue(i.toString());
      } else {
        await sendValue(result);
      }
    }
  }

  @override
  int sizeOf(String value) => 1;        // 1 entry

  @override
  int get bufferSize  => 7;            // Buffer up to 7 entries
}

///
/// Run the examples
///
void main() async {
  await data_io_stream_example();
  print('');
  await isolate_stream_example();
}

Classes

IsolateByteStreamGenerator
An IsolateStreamGenerator for building an IsolateStream of bytes, represented by Uint8List instances. This could be used e.g. to simulate a large amount of data coming from a File or a Socket. As a convenience, this class implements sizeOf to count bytes, and bufferSize to give a reasonably large buffer.
IsolateStream<T>
A Stream whose content is generated by a function that runs in a separate Isolate. A flow control protocol is established to throttle the generating function, if needed, so that the SendPort/ReceivePort buffer doesn't get overly large.
IsolateStreamGenerator<T>
An object that runs in another isolate to generate the values given by an IsolateStream. Clients of IsolateStream subclass this. An instance is provided to the IsolateStream constructor, and is sent to a new isolate using SendPort.send. The new isolate is created with Isolate.spawn. See the restrictions in SendPort.send about sending object instances.