mergeAll method
Merges values and errors from this stream and any stream in others
in
any order as they arrive.
The result stream will not close until this stream and all streams
in others
have closed.
For example:
final result = first.mergeAll([second, third]);
first: 1--2--------3--|
second: ---------4-------5--|
third: ------6---------------7--|
result: 1--2--6--4--3----5----7--|
If this stream is a broadcast stream, the result stream will be as
well, regardless the types of streams in others
. If a single
subscription stream is merged into a broadcast stream it may never be
canceled since there may be broadcast listeners added later.
If a broadcast stream is merged into a single-subscription stream any events emitted by that stream before the result stream has a subscriber will be discarded.
Implementation
Stream<T> mergeAll(Iterable<Stream<T>> others) {
final controller = isBroadcast
? StreamController<T>.broadcast(sync: true)
: StreamController<T>(sync: true);
final allStreams = [
this,
for (final other in others)
!isBroadcast || other.isBroadcast ? other : other.asBroadcastStream(),
];
controller.onListen = () {
final subscriptions = <StreamSubscription<T>>[];
for (final stream in allStreams) {
final subscription =
stream.listen(controller.add, onError: controller.addError);
subscription.onDone(() {
subscriptions.remove(subscription);
if (subscriptions.isEmpty) controller.close();
});
subscriptions.add(subscription);
}
if (!isBroadcast) {
controller
..onPause = () {
for (final subscription in subscriptions) {
subscription.pause();
}
}
..onResume = () {
for (final subscription in subscriptions) {
subscription.resume();
}
};
}
controller.onCancel = () {
if (subscriptions.isEmpty) return null;
return [for (var s in subscriptions) s.cancel()]
.wait
.then(ignoreArgument);
};
};
return controller.stream;
}