listen method

  1. @override
StreamSubscription<List<T>> listen(
  1. void onData(
    1. List<T>
    )?, {
  2. Function? onError,
  3. void onDone()?,
  4. bool? cancelOnError,
})
override

Adds a subscription to this stream.

Returns a StreamSubscription which handles events from this stream using the provided onData, onError and onDone handlers. The handlers can be changed on the subscription, but they start out as the provided functions.

On each data event from this stream, the subscriber's onData handler is called. If onData is null, nothing happens.

On errors from this stream, the onError handler is called with the error object and possibly a stack trace.

The onError callback must be of type void Function(Object error) or void Function(Object error, StackTrace). The function type determines whether onError is invoked with a stack trace argument. The stack trace argument may be StackTrace.empty if this stream received an error without a stack trace.

Otherwise it is called with just the error object. If onError is omitted, any errors on this stream are considered unhandled, and will be passed to the current Zone's error handler. By default unhandled async errors are treated as if they were uncaught top-level errors.

If this stream closes and sends a done event, the onDone handler is called. If onDone is null, nothing happens.

If cancelOnError is true, the subscription is automatically canceled when the first error event is delivered. The default is false.

While a subscription is paused, or when it has been canceled, the subscription doesn't receive events and none of the event handler functions are called.

Implementation

@override
StreamSubscription<List<T>> listen(void Function(List<T>)? onData,
    {Function? onError, void Function()? onDone, bool? cancelOnError}) {
  cancelOnError = identical(true, cancelOnError);
  var subscriptions = <StreamSubscription<T>>[];
  late StreamController<List<T>> controller;
  late List<T?> current;
  var dataCount = 0;

  /// Called for each data from a subscription in [subscriptions].
  void handleData(int index, T data) {
    current[index] = data;
    dataCount++;
    if (dataCount == subscriptions.length) {
      var data = List<T>.from(current);
      current = List<T?>.filled(subscriptions.length, null);
      dataCount = 0;
      for (var i = 0; i < subscriptions.length; i++) {
        if (i != index) subscriptions[i].resume();
      }
      controller.add(data);
    } else {
      subscriptions[index].pause();
    }
  }

  /// Called for each error from a subscription in [subscriptions].
  /// Except if [cancelOnError] is true, in which case the function below
  /// is used instead.
  void handleError(Object error, StackTrace stackTrace) {
    controller.addError(error, stackTrace);
  }

  /// Called when a subscription has an error and [cancelOnError] is true.
  ///
  /// Prematurely cancels all subscriptions since we know that we won't
  /// be needing any more values.
  void handleErrorCancel(Object error, StackTrace stackTrace) {
    for (var i = 0; i < subscriptions.length; i++) {
      subscriptions[i].cancel();
    }
    controller.addError(error, stackTrace);
  }

  void handleDone() {
    for (var i = 0; i < subscriptions.length; i++) {
      subscriptions[i].cancel();
    }
    controller.close();
  }

  try {
    for (var stream in _streams) {
      var index = subscriptions.length;
      subscriptions.add(stream.listen((data) {
        handleData(index, data);
      },
          onError: cancelOnError ? handleError : handleErrorCancel,
          onDone: handleDone,
          cancelOnError: cancelOnError));
    }
  } catch (e) {
    for (var i = subscriptions.length - 1; i >= 0; i--) {
      subscriptions[i].cancel();
    }
    rethrow;
  }

  current = List<T?>.filled(subscriptions.length, null);

  controller = StreamController<List<T>>(onPause: () {
    for (var i = 0; i < subscriptions.length; i++) {
      // This may pause some subscriptions more than once.
      // These will not be resumed by onResume below, but must wait for the
      // next round.
      subscriptions[i].pause();
    }
  }, onResume: () {
    for (var i = 0; i < subscriptions.length; i++) {
      subscriptions[i].resume();
    }
  }, onCancel: () {
    for (var i = 0; i < subscriptions.length; i++) {
      // Canceling more than once is safe.
      subscriptions[i].cancel();
    }
  });

  if (subscriptions.isEmpty) {
    controller.close();
  }
  return controller.stream.listen(onData,
      onError: onError, onDone: onDone, cancelOnError: cancelOnError);
}