takeUntil method
Implementation
Stream<T> takeUntil(Stream<void> signal) {
StreamController<T>? controller;
StreamSubscription<T>? subscription;
StreamSubscription<void>? signalSubscription;
controller = StreamController<T>(
onListen: () {
subscription = listen(
controller!.add,
onError: controller.addError,
onDone: controller.close,
);
signalSubscription = signal.listen(
(_) => controller?.close(),
onError: controller?.addError,
);
},
onPause: () => subscription?.pause(),
onResume: () => subscription?.resume(),
onCancel: () {
subscription?.cancel();
signalSubscription?.cancel();
},
);
return controller.stream;
}