concurrentAsyncExpand<S> method
Like asyncExpand but the convert
callback may be called for an element
before the Stream emitted by the previous element has closed.
Events on the result stream will be emitted in the order they are emitted by the sub streams, which may not match the order of this stream.
Errors from convert
, the source stream, or any of the sub streams are
forwarded to the result stream.
The result stream will not close until the source stream closes and all sub streams have closed.
If the source stream is a broadcast stream, the result will be as well,
regardless of the types of streams created by convert
. In this case,
some care should be taken:
- If
convert
returns a single subscription stream it may be listened to and never canceled. - For any period of time where there are no listeners on the result stream, any sub streams from previously emitted events will be ignored, regardless of whether they emit further events after a listener is added back.
See also:
- switchMap, which cancels subscriptions to the previous sub stream instead of concurrently emitting events from all sub streams.
Implementation
Stream<S> concurrentAsyncExpand<S>(Stream<S> Function(T) convert) {
final controller = isBroadcast
? StreamController<S>.broadcast(sync: true)
: StreamController<S>(sync: true);
controller.onListen = () {
final subscriptions = <StreamSubscription<dynamic>>[];
final outerSubscription = map(convert).listen((inner) {
if (isBroadcast && !inner.isBroadcast) {
inner = inner.asBroadcastStream();
}
final subscription =
inner.listen(controller.add, onError: controller.addError);
subscription.onDone(() {
subscriptions.remove(subscription);
if (subscriptions.isEmpty) controller.close();
});
subscriptions.add(subscription);
}, onError: controller.addError);
outerSubscription.onDone(() {
subscriptions.remove(outerSubscription);
if (subscriptions.isEmpty) controller.close();
});
subscriptions.add(outerSubscription);
if (!isBroadcast) {
controller
..onPause = () {
for (final subscription in subscriptions) {
subscription.pause();
}
}
..onResume = () {
for (final subscription in subscriptions) {
subscription.resume();
}
};
}
controller.onCancel = () {
if (subscriptions.isEmpty) return null;
return [for (var s in subscriptions) s.cancel()]
.wait
.then(ignoreArgument);
};
};
return controller.stream;
}