takeUntil method

Stream<T> takeUntil(
  1. Future<void> trigger
)

Takes values from this stream which are emitted before trigger completes.

Completing trigger differs from canceling a subscription in that values which are emitted before the trigger, but have further asynchronous delays in transformations following the takeUtil, will still go through. Cancelling a subscription immediately stops values.

If trigger completes as an error, the error will be forwarded through the result stream before the result stream closes.

If trigger completes as a value or as an error after this stream has already ended, the completion will be ignored.

Implementation

Stream<T> takeUntil(Future<void> trigger) {
  var controller = isBroadcast
      ? StreamController<T>.broadcast(sync: true)
      : StreamController<T>(sync: true);

  StreamSubscription<T>? subscription;
  var isDone = false;
  trigger.then((_) {
    if (isDone) return;
    isDone = true;
    subscription?.cancel();
    controller.close();
  }, onError: (Object error, StackTrace stackTrace) {
    if (isDone) return;
    isDone = true;
    controller
      ..addError(error, stackTrace)
      ..close();
  });

  controller.onListen = () {
    if (isDone) return;
    subscription =
        listen(controller.add, onError: controller.addError, onDone: () {
      if (isDone) return;
      isDone = true;
      controller.close();
    });
    if (!isBroadcast) {
      controller
        ..onPause = subscription!.pause
        ..onResume = subscription!.resume;
    }
    controller.onCancel = () {
      if (isDone) return null;
      var toCancel = subscription!;
      subscription = null;
      return toCancel.cancel();
    };
  };
  return controller.stream;
}