combineLatest<T2, S> method
Combines the latest values from this stream with the latest values from
other
using combine
.
No event will be emitted until both the source stream and other
have
each emitted at least one event. If either the source stream or other
emit multiple events before the other emits the first event, all but the
last value will be discarded. Once both streams have emitted at least
once, the result stream will emit any time either input stream emits.
The result stream will not close until both the source stream and other
have closed.
For example:
source.combineLatest(other, (a, b) => a + b);
source: --1--2--------4--|
other: -------3--|
result: -------5------7--|
Errors thrown by combine
, along with any errors on the source stream or
other
, are forwarded to the result stream.
If the source stream is a broadcast stream, the result stream will be as
well, regardless of other
's type. If a single subscription stream is
combined with a broadcast stream it may never be canceled.
Implementation
Stream<S> combineLatest<T2, S>(
Stream<T2> other, FutureOr<S> Function(T, T2) combine) {
final controller = isBroadcast
? StreamController<S>.broadcast(sync: true)
: StreamController<S>(sync: true);
other =
(isBroadcast && !other.isBroadcast) ? other.asBroadcastStream() : other;
StreamSubscription<T>? sourceSubscription;
StreamSubscription<T2>? otherSubscription;
var sourceDone = false;
var otherDone = false;
late T latestSource;
late T2 latestOther;
var sourceStarted = false;
var otherStarted = false;
void emitCombined() {
if (!sourceStarted || !otherStarted) return;
FutureOr<S> result;
try {
result = combine(latestSource, latestOther);
} catch (e, s) {
controller.addError(e, s);
return;
}
if (result is Future<S>) {
sourceSubscription!.pause();
otherSubscription!.pause();
result
.then(controller.add, onError: controller.addError)
.whenComplete(() {
sourceSubscription!.resume();
otherSubscription!.resume();
});
} else {
controller.add(result);
}
}
controller.onListen = () {
assert(sourceSubscription == null);
sourceSubscription = listen(
(s) {
sourceStarted = true;
latestSource = s;
emitCombined();
},
onError: controller.addError,
onDone: () {
sourceDone = true;
if (otherDone) {
controller.close();
} else if (!sourceStarted) {
// Nothing can ever be emitted
otherSubscription!.cancel();
controller.close();
}
});
otherSubscription = other.listen(
(o) {
otherStarted = true;
latestOther = o;
emitCombined();
},
onError: controller.addError,
onDone: () {
otherDone = true;
if (sourceDone) {
controller.close();
} else if (!otherStarted) {
// Nothing can ever be emitted
sourceSubscription!.cancel();
controller.close();
}
});
if (!isBroadcast) {
controller
..onPause = () {
sourceSubscription!.pause();
otherSubscription!.pause();
}
..onResume = () {
sourceSubscription!.resume();
otherSubscription!.resume();
};
}
controller.onCancel = () {
var cancels = [
sourceSubscription!.cancel(),
otherSubscription!.cancel()
];
sourceSubscription = null;
otherSubscription = null;
return cancels.wait.then(ignoreArgument);
};
};
return controller.stream;
}