parallelMap<V> method

Stream<V> parallelMap<V>(
  1. Future<V> mapper(
    1. T t
    )
)

Unlike asyncMap and semaphoreMap: the order of the output is not guaranteed. As soon as a Future is completed, it is yielded. Once all Futures are completed, the Stream is completed.

Implementation

Stream<V> parallelMap<V>(Future<V> Function(T t) mapper) {
  StreamController<V> controller = StreamController<V>();
  late StreamSubscription<T> subscription;
  int pending = 0;
  bool sourceDone = false;

  subscription = listen(
    (event) {
      pending++;
      mapper(event)
          .then(controller.add)
          .catchError(controller.addError)
          .whenComplete(() {
        pending--;
        if (sourceDone && pending == 0 && !controller.isClosed) {
          controller.close();
        }
      });
    },
    onError: controller.addError,
    onDone: () {
      sourceDone = true;
      if (sourceDone && pending == 0 && !controller.isClosed) {
        controller.close();
      }
    },
    cancelOnError: false,
  );

  controller.onCancel = () => subscription.cancel();
  return controller.stream;
}