combineLatestStreams<A, B, R> function

Stream<R> combineLatestStreams<A, B, R>(
  1. Stream<A> a,
  2. Stream<B> b,
  3. R combine(
    1. A,
    2. B
    )
)

Emits combine(latestA, latestB) whenever EITHER a or b emits, but only once both have produced at least one value (the standard "combineLatest").

The result stream errors-forward from either source and completes when BOTH sources have completed. Source subscriptions start when the result stream is first listened to (not eagerly) and are cancelled on cancel, so no events are buffered before a listener exists.

Example:

// a: 1 .. 2 ..      b: .. x .. y
// emits: 1x, 2x, 2y

Audited: 2026-06-12 11:26 EDT

Implementation

Stream<R> combineLatestStreams<A, B, R>(Stream<A> a, Stream<B> b, R Function(A, B) combine) {
  late final StreamController<R> controller;
  StreamSubscription<A>? subA;
  StreamSubscription<B>? subB;
  // A holder cell preserves each value's real type (so combine takes A/B with
  // no cast) and doubles as the "have we seen one yet?" flag via null.
  _Cell<A>? cellA;
  _Cell<B>? cellB;
  int openCount = 2;

  void emit() {
    final _Cell<A>? ca = cellA;
    final _Cell<B>? cb = cellB;
    // Hold emission until BOTH sides have produced a value at least once.
    if (ca != null && cb != null) controller.add(combine(ca.value, cb.value));
  }

  void onSourceDone() {
    // The controller-close Future is intentionally not awaited; errors while
    // closing are not actionable here, and unawaited documents the intent.
    if (--openCount == 0) unawaited(controller.close());
  }

  controller = StreamController<R>(
    onListen: () {
      subA = a.listen(
        (A v) {
          cellA = _Cell<A>(v);
          emit();
        },
        onError: controller.addError,
        onDone: onSourceDone,
      );
      subB = b.listen(
        (B v) {
          cellB = _Cell<B>(v);
          emit();
        },
        onError: controller.addError,
        onDone: onSourceDone,
      );
    },
    onCancel: () async {
      await subA?.cancel();
      await subB?.cancel();
    },
  );
  return controller.stream;
}