takeUntil method

Stream<T> takeUntil(
  1. Stream<void> signal
)

Implementation

Stream<T> takeUntil(Stream<void> signal) {
  StreamController<T>? controller;
  StreamSubscription<T>? subscription;
  StreamSubscription<void>? signalSubscription;

  controller = StreamController<T>(
    onListen: () {
      subscription = listen(
        controller!.add,
        onError: controller.addError,
        onDone: controller.close,
      );
      signalSubscription = signal.listen(
        (_) => controller?.close(),
        onError: controller?.addError,
      );
    },
    onPause: () => subscription?.pause(),
    onResume: () => subscription?.resume(),
    onCancel: () {
      subscription?.cancel();
      signalSubscription?.cancel();
    },
  );
  return controller.stream;
}