switchLatest method

Stream<T> switchLatest()

Emits values from the most recently emitted Stream.

When the source emits a stream, the output will switch to emitting events from that stream.

Whether the source stream is a single-subscription stream or a broadcast stream, the result stream will be the same kind of stream, regardless of the types of streams emitted.

Implementation

Stream<T> switchLatest() {
  var controller = isBroadcast
      ? StreamController<T>.broadcast(sync: true)
      : StreamController<T>(sync: true);

  controller.onListen = () {
    StreamSubscription<T>? innerSubscription;
    var outerStreamDone = false;

    void listenToInnerStream(Stream<T> innerStream) {
      assert(innerSubscription == null);
      var subscription = innerStream
          .listen(controller.add, onError: controller.addError, onDone: () {
        innerSubscription = null;
        if (outerStreamDone) controller.close();
      });
      // If a pause happens during an innerSubscription.cancel,
      // we still listen to the next stream when the cancel is done.
      // Then we immediately pause it again here.
      if (controller.isPaused) subscription.pause();
      innerSubscription = subscription;
    }

    var addError = controller.addError;
    final outerSubscription = listen(null, onError: addError, onDone: () {
      outerStreamDone = true;
      if (innerSubscription == null) controller.close();
    });
    outerSubscription.onData((innerStream) async {
      var currentSubscription = innerSubscription;
      if (currentSubscription == null) {
        listenToInnerStream(innerStream);
        return;
      }
      innerSubscription = null;
      outerSubscription.pause();
      try {
        await currentSubscription.cancel();
      } catch (error, stack) {
        controller.addError(error, stack);
      } finally {
        if (!isBroadcast && !controller.hasListener) {
          // Result single-subscription stream subscription was cancelled
          // while waiting for previous innerStream cancel.
          //
          // Ensure that the last received stream is also listened to and
          // cancelled, then do nothing further.
          innerStream.listen(null).cancel().ignore();
        } else {
          outerSubscription.resume();
          listenToInnerStream(innerStream);
        }
      }
    });
    if (!isBroadcast) {
      controller
        ..onPause = () {
          innerSubscription?.pause();
          outerSubscription.pause();
        }
        ..onResume = () {
          innerSubscription?.resume();
          outerSubscription.resume();
        };
    }
    controller.onCancel = () {
      var _innerSubscription = innerSubscription;
      var cancels = [
        if (!outerStreamDone) outerSubscription.cancel(),
        if (_innerSubscription != null) _innerSubscription.cancel(),
      ]
        // Handle opt-out nulls
        ..removeWhere((Object? f) => f == null);
      if (cancels.isEmpty) return null;
      return Future.wait(cancels).then(_ignore);
    };
  };
  return controller.stream;
}