zipStreams<A, B, R> function

Stream<R> zipStreams<A, B, R>(
  1. Stream<A> a,
  2. Stream<B> b,
  3. R combine(
    1. A,
    2. B
    )
)

Pairs a and b by index, emitting combine(aᵢ, bᵢ) for each index until either stream completes (the standard "zip" operator).

Unlike combineLatestStreams, a value is consumed from BOTH streams before each emission, so the two streams advance in lock-step. A trailing value from the longer stream that has no partner is dropped. Both source subscriptions are cancelled when the result stream is done or cancelled.

Example:

zipStreams(Stream.fromIterable([1, 2, 3]),
           Stream.fromIterable(['a', 'b']),
           (int n, String s) => '$n$s'); // 1a, 2b

Audited: 2026-06-12 11:26 EDT

Implementation

Stream<R> zipStreams<A, B, R>(Stream<A> a, Stream<B> b, R Function(A, B) combine) async* {
  final StreamIterator<A> ia = StreamIterator<A>(a);
  final StreamIterator<B> ib = StreamIterator<B>(b);
  try {
    // moveNext is awaited for BOTH before yielding, so a half-pair at the end
    // (one stream still has a value, the other is exhausted) is never emitted.
    while (await ia.moveNext() && await ib.moveNext()) {
      yield combine(ia.current, ib.current);
    }
  } finally {
    await ia.cancel();
    await ib.cancel();
  }
}