forEach<S, T> method

Stream<T> forEach<S, T>(
  1. Iterable<S> elements,
  2. FutureOr<T> action(
    1. S source
    ), {
  3. bool onError(
    1. S item,
    2. Object error,
    3. StackTrace stack
    )?,
})

Returns a Stream containing the result of action applied to each element of elements.

While action is invoked on each element of elements in order, it's possible the return Stream may have items out-of-order – especially if the completion time of action varies.

If action throws an error the source item along with the error object and StackTrace are passed to onError, if it is provided. If onError returns true, the error is added to the returned Stream, otherwise it is ignored.

Errors thrown from iterating elements will not be passed to onError. They will always be added to the returned stream as an error.

Note: all of the resources of the this Pool will be used when the returned Stream is listened to until it is completed or canceled.

Note: if this Pool is closed before the returned Stream is listened to, a StateError is thrown.

Implementation

Stream<T> forEach<S, T>(
    Iterable<S> elements, FutureOr<T> Function(S source) action,
    {bool Function(S item, Object error, StackTrace stack)? onError}) {
  onError ??= (item, e, s) => true;

  var cancelPending = false;

  Completer? resumeCompleter;
  late StreamController<T> controller;

  late Iterator<S> iterator;

  Future<void> run(int _) async {
    while (iterator.moveNext()) {
      // caching `current` is necessary because there are async breaks
      // in this code and `iterator` is shared across many workers
      final current = iterator.current;

      _resetTimer();

      if (resumeCompleter != null) {
        await resumeCompleter!.future;
      }

      if (cancelPending) {
        break;
      }

      T value;
      try {
        value = await action(current);
      } catch (e, stack) {
        if (onError!(current, e, stack)) {
          controller.addError(e, stack);
        }
        continue;
      }
      controller.add(value);
    }
  }

  Future<void>? doneFuture;

  void onListen() {
    iterator = elements.iterator;

    assert(doneFuture == null);
    var futures = Iterable<Future<void>>.generate(
        _maxAllocatedResources, (i) => withResource(() => run(i)));
    doneFuture = Future.wait(futures, eagerError: true)
        .then<void>((_) {})
        .catchError(controller.addError);

    doneFuture!.whenComplete(controller.close);
  }

  controller = StreamController<T>(
    sync: true,
    onListen: onListen,
    onCancel: () async {
      assert(!cancelPending);
      cancelPending = true;
      await doneFuture;
    },
    onPause: () {
      assert(resumeCompleter == null);
      resumeCompleter = Completer<void>();
    },
    onResume: () {
      assert(resumeCompleter != null);
      resumeCompleter!.complete();
      resumeCompleter = null;
    },
  );

  return controller.stream;
}