bind method

  1. @override
Stream<HashDigest> bind(
  1. Stream<List<int>> stream
)
override

Transforms the byte array input stream to generate a new stream which contains a single HashDigest

The expected behavior of this method is described below:

  • When the returned stream has a subscriber (calling Stream.listen), the message-digest generation begins from the input stream.
  • If the returned stream is paused, the processing of the input stream is also paused, and on resume, it continue processing from where it was left off.
  • If the returned stream is cancelled, the subscription to the input stream is also cancelled.
  • When the input stream is closed, the returned stream also gets closed with a HashDigest data. The returned stream may produce only one such data event in its life-time.
  • On error reading the input stream, or while processing the message digest, the subscription to input stream cancels immediately and the returned stream closes with an error event.

Implementation

@override
Stream<HashDigest> bind(Stream<List<int>> stream) {
  bool paused = false;
  bool cancelled = false;
  StreamSubscription<List<int>>? subscription;
  final controller = StreamController<HashDigest>(sync: false);
  controller.onCancel = () async {
    cancelled = true;
    await subscription?.cancel();
  };
  controller.onPause = () {
    paused = true;
    subscription?.pause();
  };
  controller.onResume = () {
    paused = false;
    subscription?.resume();
  };
  controller.onListen = () {
    if (cancelled) return;
    bool hasError = false;
    final sink = createSink();
    subscription = stream.listen(
      (List<int> event) {
        try {
          sink.add(event);
        } catch (err, stack) {
          hasError = true;
          subscription?.cancel();
          controller.addError(err, stack);
        }
      },
      cancelOnError: true,
      onError: (Object err, [StackTrace? stack]) {
        hasError = true;
        controller.addError(err, stack);
      },
      onDone: () {
        try {
          if (!hasError) {
            controller.add(sink.digest());
          }
        } catch (err, stack) {
          controller.addError(err, stack);
        } finally {
          controller.close();
        }
      },
    );
    if (paused) {
      subscription?.pause();
    }
  };
  return controller.stream;
}