combineLatestAll method

Stream<List<T>> combineLatestAll(
  1. Iterable<Stream<T>> others
)

Combine the latest value emitted from the source stream with the latest values emitted from others.

combineLatestAll subscribes to the source stream and others and when any one of the streams emits, the result stream will emit a List<T> of the latest values emitted from all streams.

No event will be emitted until all source streams emit at least once. If a source stream emits multiple values before another starts emitting, all but the last value will be discarded. Once all source streams have emitted at least once, the result stream will emit any time any source stream emits.

The result stream will not close until all source streams have closed. When a source stream closes, the result stream will continue to emit the last value from the closed stream when the other source streams emit until the result stream has closed. If a source stream closes without emitting any value, the result stream will close as well.

For example:

final combined = first
    .combineLatestAll([second, third])
    .map((data) => data.join());

first:    a----b------------------c--------d---|
second:   --1---------2-----------------|
third:    -------&----------%---|
combined: -------b1&--b2&---b2%---c2%------d2%-|

Errors thrown by any source stream will be forwarded to the result stream.

If the source stream is a broadcast stream, the result stream will be as well, regardless of the types of others. If a single subscription stream is combined with a broadcast source stream, it may never be canceled.

Implementation

Stream<List<T>> combineLatestAll(Iterable<Stream<T>> others) {
  final controller = isBroadcast
      ? StreamController<List<T>>.broadcast(sync: true)
      : StreamController<List<T>>(sync: true);

  final allStreams = [
    this,
    for (final other in others)
      !isBroadcast || other.isBroadcast ? other : other.asBroadcastStream(),
  ];

  controller.onListen = () {
    final subscriptions = <StreamSubscription<T>>[];

    final latestData = List<T?>.filled(allStreams.length, null);
    final hasEmitted = <int>{};
    void handleData(int index, T data) {
      latestData[index] = data;
      hasEmitted.add(index);
      if (hasEmitted.length == allStreams.length) {
        controller.add(List.from(latestData));
      }
    }

    var streamId = 0;
    for (final stream in allStreams) {
      final index = streamId;

      final subscription = stream.listen((data) => handleData(index, data),
          onError: controller.addError);
      subscription.onDone(() {
        assert(subscriptions.contains(subscription));
        subscriptions.remove(subscription);
        if (subscriptions.isEmpty || !hasEmitted.contains(index)) {
          controller.close();
        }
      });
      subscriptions.add(subscription);

      streamId++;
    }
    if (!isBroadcast) {
      controller
        ..onPause = () {
          for (final subscription in subscriptions) {
            subscription.pause();
          }
        }
        ..onResume = () {
          for (final subscription in subscriptions) {
            subscription.resume();
          }
        };
    }
    controller.onCancel = () {
      if (subscriptions.isEmpty) return null;
      var cancels = [for (var s in subscriptions) s.cancel()]
        // Handle opt-out nulls
        ..removeWhere((Object? f) => f == null);
      if (cancels.isEmpty) return null;
      return Future.wait(cancels).then((_) => null);
    };
  };
  return controller.stream;
}