asyncWhere method

Stream<T> asyncWhere(
  1. FutureOr<bool> test(
    1. T
    )
)

Discards events from this stream based on an asynchronous test callback.

Like where but allows the test to return a Future.

Events on the result stream will be emitted in the order that test completes which may not match the order of this stream.

If the source stream is a broadcast stream the result will be as well. When used with a broadcast stream behavior also differs from where in that the test function is only called once per event, rather than once per listener per event.

Errors from the source stream are forwarded directly to the result stream. Errors from test are also forwarded to the result stream.

The result stream will not close until the source stream closes and all pending test calls have finished.

Implementation

Stream<T> asyncWhere(FutureOr<bool> Function(T) test) {
  var valuesWaiting = 0;
  var sourceDone = false;
  return transformByHandlers(onData: (element, sink) {
    valuesWaiting++;
    () async {
      try {
        if (await test(element)) sink.add(element);
      } catch (e, st) {
        sink.addError(e, st);
      }
      valuesWaiting--;
      if (valuesWaiting <= 0 && sourceDone) sink.close();
    }();
  }, onDone: (sink) {
    sourceDone = true;
    if (valuesWaiting <= 0) sink.close();
  });
}