takeUntil method

Stream<T> takeUntil(
  1. Future<void> trigger
)

Takes values from this stream which are emitted before trigger completes.

Completing trigger differs from canceling a subscription in that values which are emitted before the trigger, but have further asynchronous delays in transformations following the takeUtil, will still go through. Cancelling a subscription immediately stops values.

Implementation

Stream<T> takeUntil(Future<void> trigger) {
  var controller = isBroadcast
      ? StreamController<T>.broadcast(sync: true)
      : StreamController<T>(sync: true);

  StreamSubscription<T>? subscription;
  var isDone = false;
  trigger.then((_) {
    if (isDone) return;
    isDone = true;
    subscription?.cancel();
    controller.close();
  });

  controller.onListen = () {
    if (isDone) return;
    subscription =
        listen(controller.add, onError: controller.addError, onDone: () {
      if (isDone) return;
      isDone = true;
      controller.close();
    });
    if (!isBroadcast) {
      controller
        ..onPause = subscription!.pause
        ..onResume = subscription!.resume;
    }
    controller.onCancel = () {
      if (isDone) return null;
      var toCancel = subscription!;
      subscription = null;
      return toCancel.cancel();
    };
  };
  return controller.stream;
}