combineLatest<T2, S> method

Stream<S> combineLatest<T2, S>(
  1. Stream<T2> other,
  2. FutureOr<S> combine(
    1. T,
    2. T2
    )
)

Combines the latest values from this stream with the latest values from other using combine.

No event will be emitted until both the source stream and other have each emitted at least one event. If either the source stream or other emit multiple events before the other emits the first event, all but the last value will be discarded. Once both streams have emitted at least once, the result stream will emit any time either input stream emits.

The result stream will not close until both the source stream and other have closed.

For example:

source.combineLatest(other, (a, b) => a + b);

source: --1--2--------4--|
other:  -------3--|
result: -------5------7--|

Errors thrown by combine, along with any errors on the source stream or other, are forwarded to the result stream.

If the source stream is a broadcast stream, the result stream will be as well, regardless of other's type. If a single subscription stream is combined with a broadcast stream it may never be canceled.

Implementation

Stream<S> combineLatest<T2, S>(
    Stream<T2> other, FutureOr<S> Function(T, T2) combine) {
  final controller = isBroadcast
      ? StreamController<S>.broadcast(sync: true)
      : StreamController<S>(sync: true);

  other =
      (isBroadcast && !other.isBroadcast) ? other.asBroadcastStream() : other;

  StreamSubscription<T>? sourceSubscription;
  StreamSubscription<T2>? otherSubscription;

  var sourceDone = false;
  var otherDone = false;

  late T latestSource;
  late T2 latestOther;

  var sourceStarted = false;
  var otherStarted = false;

  void emitCombined() {
    if (!sourceStarted || !otherStarted) return;
    FutureOr<S> result;
    try {
      result = combine(latestSource, latestOther);
    } catch (e, s) {
      controller.addError(e, s);
      return;
    }
    if (result is Future<S>) {
      sourceSubscription!.pause();
      otherSubscription!.pause();
      result
          .then(controller.add, onError: controller.addError)
          .whenComplete(() {
        sourceSubscription!.resume();
        otherSubscription!.resume();
      });
    } else {
      controller.add(result);
    }
  }

  controller.onListen = () {
    assert(sourceSubscription == null);
    sourceSubscription = listen(
        (s) {
          sourceStarted = true;
          latestSource = s;
          emitCombined();
        },
        onError: controller.addError,
        onDone: () {
          sourceDone = true;
          if (otherDone) {
            controller.close();
          } else if (!sourceStarted) {
            // Nothing can ever be emitted
            otherSubscription!.cancel();
            controller.close();
          }
        });
    otherSubscription = other.listen(
        (o) {
          otherStarted = true;
          latestOther = o;
          emitCombined();
        },
        onError: controller.addError,
        onDone: () {
          otherDone = true;
          if (sourceDone) {
            controller.close();
          } else if (!otherStarted) {
            // Nothing can ever be emitted
            sourceSubscription!.cancel();
            controller.close();
          }
        });
    if (!isBroadcast) {
      controller
        ..onPause = () {
          sourceSubscription!.pause();
          otherSubscription!.pause();
        }
        ..onResume = () {
          sourceSubscription!.resume();
          otherSubscription!.resume();
        };
    }
    controller.onCancel = () {
      var cancels = [
        sourceSubscription!.cancel(),
        otherSubscription!.cancel()
      ];
      sourceSubscription = null;
      otherSubscription = null;
      return cancels.wait.then(ignoreArgument);
    };
  };
  return controller.stream;
}