mapAsyncLimited<E> method

Stream<E> mapAsyncLimited<E>(
  1. FutureOr<E> convert(
    1. T event
    ), {
  2. int maxPending = 1,
})

Implementation

Stream<E> mapAsyncLimited<E>(FutureOr<E> convert(T event),
    {int maxPending = 1}) {
  late StreamController<E> output;
  late StreamSubscription<T> input;

  /// Used to track when we've completed reading the source stream.
  var isClosing = false;

  /// The number of outstanding map operations
  var pending = 0;

  /// Run when the source stream is completed - ensures we're done processing
  /// all pending futures
  Future _checkClose() async {
    if (pending == 0) {
      await output.close();
    }
  }

  void onListen() {
    final add = output.add;

    final void Function(Object, [StackTrace]) addError = output.addError;
    input = this.listen(
        (T event) {
          FutureOr<E> newValue;
          try {
            newValue = convert(event);
          } catch (e, s) {
            output.addError(e, s);
            return;
          }
          if (newValue is Future<E>) {
            final isLimited = pending++ > maxPending;
            if (isLimited) {
              input.pause();
            }
            newValue.then(add, onError: addError).whenComplete(() async {
              pending--;
              if (isClosing) {
                await _checkClose();
              } else if (input.isPaused && pending < maxPending) {
                input.resume();
              }
            });
          } else if (newValue is E) {
            output.add(newValue);
          } else {
            output.addError(
                Exception("Expected $E but found ${newValue?.runtimeType}"));
          }
        },
        cancelOnError: false,
        onError: addError,
        onDone: () {
          isClosing = true;
          _checkClose();
        });
  }

  if (this.isBroadcast) {
    output = StreamController<E>.broadcast(
        onListen: onListen,
        onCancel: () {
          input.cancel();
        },
        sync: true);
  } else {
    output = StreamController<E>(
        onListen: onListen,
        onPause: () {
          input.pause();
        },
        onResume: () {
          input.resume();
        },
        onCancel: () => input.cancel(),
        sync: true);
  }
  return output.stream;
}