concurrentAsyncMap<S> method

Stream<S> concurrentAsyncMap<S>(
  1. FutureOr<S> convert(
    1. T
    )
)

Like asyncMap but the convert callback may be called for an element before processing for the previous element is finished.

Events on the result stream will be emitted in the order that convert completed which may not match the order of this stream.

If this stream is a broadcast stream the result will be as well. When used with a broadcast stream behavior also differs from asyncMap in that the convert function is only called once per event, rather than once per listener per event. The convert callback won't be called for events while a broadcast stream has no listener.

Errors from convert or this stream are forwarded directly to the result stream.

The result stream will not close until this stream closes and all pending conversions have finished.

Implementation

Stream<S> concurrentAsyncMap<S>(FutureOr<S> Function(T) convert) {
  var valuesWaiting = 0;
  var sourceDone = false;
  return transformByHandlers(onData: (element, sink) {
    valuesWaiting++;
    () async {
      try {
        sink.add(await convert(element));
      } catch (e, st) {
        sink.addError(e, st);
      }
      valuesWaiting--;
      if (valuesWaiting <= 0 && sourceDone) sink.close();
    }();
  }, onDone: (sink) {
    sourceDone = true;
    if (valuesWaiting <= 0) sink.close();
  });
}