takeUntil method
Takes values from this stream which are emitted before trigger
completes.
Completing trigger
differs from canceling a subscription in that values
which are emitted before the trigger, but have further asynchronous delays
in transformations following the takeUtil, will still go through.
Cancelling a subscription immediately stops values.
If trigger
completes as an error, the error will be forwarded through
the result stream before the result stream closes.
If trigger
completes as a value or as an error after this stream has
already ended, the completion will be ignored.
Implementation
Stream<T> takeUntil(Future<void> trigger) {
var controller = isBroadcast
? StreamController<T>.broadcast(sync: true)
: StreamController<T>(sync: true);
StreamSubscription<T>? subscription;
var isDone = false;
trigger.then((_) {
if (isDone) return;
isDone = true;
subscription?.cancel();
controller.close();
}, onError: (Object error, StackTrace stackTrace) {
if (isDone) return;
isDone = true;
controller
..addError(error, stackTrace)
..close();
});
controller.onListen = () {
if (isDone) return;
subscription =
listen(controller.add, onError: controller.addError, onDone: () {
if (isDone) return;
isDone = true;
controller.close();
});
if (!isBroadcast) {
controller
..onPause = subscription!.pause
..onResume = subscription!.resume;
}
controller.onCancel = () {
if (isDone) return null;
var toCancel = subscription!;
subscription = null;
return toCancel.cancel();
};
};
return controller.stream;
}