forEach<S, T> method
- Iterable<
S> elements, - FutureOr<
T> action(- S source
- bool onError(
- S item,
- Object error,
- StackTrace stack
Returns a Stream containing the result of action
applied to each
element of elements
.
While action
is invoked on each element of elements
in order,
it's possible the return Stream may have items out-of-order – especially
if the completion time of action
varies.
If action
throws an error the source item along with the error object
and StackTrace are passed to onError
, if it is provided. If onError
returns true
, the error is added to the returned Stream, otherwise
it is ignored.
Errors thrown from iterating elements
will not be passed to
onError
. They will always be added to the returned stream as an error.
Note: all of the resources of the this Pool will be used when the returned Stream is listened to until it is completed or canceled.
Note: if this Pool is closed before the returned Stream is listened to, a StateError is thrown.
Implementation
Stream<T> forEach<S, T>(
Iterable<S> elements, FutureOr<T> Function(S source) action,
{bool Function(S item, Object error, StackTrace stack)? onError}) {
onError ??= (item, e, s) => true;
var cancelPending = false;
Completer? resumeCompleter;
late StreamController<T> controller;
late Iterator<S> iterator;
Future<void> run(int _) async {
while (iterator.moveNext()) {
// caching `current` is necessary because there are async breaks
// in this code and `iterator` is shared across many workers
final current = iterator.current;
_resetTimer();
if (resumeCompleter != null) {
await resumeCompleter!.future;
}
if (cancelPending) {
break;
}
T value;
try {
value = await action(current);
} catch (e, stack) {
if (onError!(current, e, stack)) {
controller.addError(e, stack);
}
continue;
}
controller.add(value);
}
}
Future<void>? doneFuture;
void onListen() {
iterator = elements.iterator;
assert(doneFuture == null);
var futures = Iterable<Future<void>>.generate(
_maxAllocatedResources, (i) => withResource(() => run(i)));
doneFuture = Future.wait(futures, eagerError: true)
.then<void>((_) {})
.catchError(controller.addError);
doneFuture!.whenComplete(controller.close);
}
controller = StreamController<T>(
sync: true,
onListen: onListen,
onCancel: () async {
assert(!cancelPending);
cancelPending = true;
await doneFuture;
},
onPause: () {
assert(resumeCompleter == null);
resumeCompleter = Completer<void>();
},
onResume: () {
assert(resumeCompleter != null);
resumeCompleter!.complete();
resumeCompleter = null;
},
);
return controller.stream;
}