switchLatest method
Emits values from the most recently emitted Stream.
When the source emits a stream, the output will switch to emitting events from that stream.
Whether the source stream is a single-subscription stream or a broadcast stream, the result stream will be the same kind of stream, regardless of the types of streams emitted.
Implementation
Stream<T> switchLatest() {
var controller = isBroadcast
? StreamController<T>.broadcast(sync: true)
: StreamController<T>(sync: true);
controller.onListen = () {
StreamSubscription<T>? innerSubscription;
var outerStreamDone = false;
void listenToInnerStream(Stream<T> innerStream) {
assert(innerSubscription == null);
var subscription = innerStream
.listen(controller.add, onError: controller.addError, onDone: () {
innerSubscription = null;
if (outerStreamDone) controller.close();
});
// If a pause happens during an innerSubscription.cancel,
// we still listen to the next stream when the cancel is done.
// Then we immediately pause it again here.
if (controller.isPaused) subscription.pause();
innerSubscription = subscription;
}
var addError = controller.addError;
final outerSubscription = listen(null, onError: addError, onDone: () {
outerStreamDone = true;
if (innerSubscription == null) controller.close();
});
outerSubscription.onData((innerStream) async {
var currentSubscription = innerSubscription;
if (currentSubscription == null) {
listenToInnerStream(innerStream);
return;
}
innerSubscription = null;
outerSubscription.pause();
try {
await currentSubscription.cancel();
} catch (error, stack) {
controller.addError(error, stack);
} finally {
if (!isBroadcast && !controller.hasListener) {
// Result single-subscription stream subscription was cancelled
// while waiting for previous innerStream cancel.
//
// Ensure that the last received stream is also listened to and
// cancelled, then do nothing further.
innerStream.listen(null).cancel().ignore();
} else {
outerSubscription.resume();
listenToInnerStream(innerStream);
}
}
});
if (!isBroadcast) {
controller
..onPause = () {
innerSubscription?.pause();
outerSubscription.pause();
}
..onResume = () {
innerSubscription?.resume();
outerSubscription.resume();
};
}
controller.onCancel = () {
var sub = innerSubscription;
var cancels = [
if (!outerStreamDone) outerSubscription.cancel(),
if (sub != null) sub.cancel(),
];
if (cancels.isEmpty) return null;
return cancels.wait.then(ignoreArgument);
};
};
return controller.stream;
}