ForEach<T> constructor

ForEach<T>(
  1. Stream<T> stream,
  2. CancellationToken token,
  3. FutureOr<bool> onData(
    1. T event
    ), {
  4. bool? cancelOnError,
})

Implementation

ForEach(this.stream, CancellationToken token,
    final FutureOr<bool> Function(T event) onData,
    {bool? cancelOnError})
    : _onData = onData,
      _token = token {
  _subscription = stream.listen(
    _listen,
    cancelOnError: cancelOnError,
    onDone: _onDone,
    onError: _onError,
  );
  _handler = token.addHandler(() {
    _removerHandler();
    _subscription.cancel().whenComplete(() {
      if (!_completer.isCompleted) {
        _onError(TaskCanceledError(), StackTrace.current);
      }
    });
  });
}