concurrentAsyncMap<S> method
Like asyncMap but the convert
callback may be called for an element
before processing for the previous element is finished.
Events on the result stream will be emitted in the order that convert
completed which may not match the order of this stream.
If this stream is a broadcast stream the result will be as well.
When used with a broadcast stream behavior also differs from asyncMap in
that the convert
function is only called once per event, rather than
once per listener per event. The convert
callback won't be called for
events while a broadcast stream has no listener.
Errors from convert
or this stream are forwarded directly to the
result stream.
The result stream will not close until this stream closes and all pending conversions have finished.
Implementation
Stream<S> concurrentAsyncMap<S>(FutureOr<S> Function(T) convert) {
var valuesWaiting = 0;
var sourceDone = false;
return transformByHandlers(onData: (element, sink) {
valuesWaiting++;
() async {
try {
sink.add(await convert(element));
} catch (e, st) {
sink.addError(e, st);
}
valuesWaiting--;
if (valuesWaiting <= 0 && sourceDone) sink.close();
}();
}, onDone: (sink) {
sourceDone = true;
if (valuesWaiting <= 0) sink.close();
});
}