transform<R> method

OffsetIterator<R> transform<R>(
  1. FutureOr<List<R>?> pred(
    1. T
    ), {
  2. String name = 'transform',
  3. SeedCallback<R>? seed,
  4. int retention = 0,
  5. int concurrency = 1,
  6. bool? cancelOnError,
  7. bool bubbleCancellation = true,
})

Implementation

OffsetIterator<R> transform<R>(
  FutureOr<List<R>?> Function(T) pred, {
  String name = 'transform',
  SeedCallback<R>? seed,
  int retention = 0,
  int concurrency = 1,
  bool? cancelOnError,
  bool bubbleCancellation = true,
}) {
  if (concurrency > 1) {
    return transformConcurrent(
      pred,
      concurrency: concurrency,
      seed: seed,
      retention: retention,
    );
  }

  final parent = this;

  FutureOr<OffsetIteratorState<R>> handleNextChunk(
    Option<T> item,
    List<R>? chunk,
  ) {
    final hasValue = O.isSome(item);
    final hasMore = !hasValue || chunk != null;

    return OffsetIteratorState(
      chunk: hasValue ? (chunk ?? const []) : null,
      hasMore: hasMore && parent.hasMore(),
    );
  }

  FutureOr<OffsetIteratorState<R>> handleItem(Option<T> item) {
    final chunkFuture = item is Some ? pred((item as Some).value) : null;

    if (chunkFuture is Future) {
      return (chunkFuture as Future<List<R>?>)
          .then((chunk) => handleNextChunk(item, chunk));
    }

    return handleNextChunk(item, chunkFuture);
  }

  return OffsetIterator(
    name: toStringWithChild(name),
    process: (_) {
      final itemFuture = parent.pull();
      if (itemFuture is Future) {
        return (itemFuture as Future<Option<T>>).then(handleItem);
      }
      return handleItem(itemFuture);
    },
    cleanup: parent.generateCleanup(bubbleCancellation: bubbleCancellation),
    cancelOnError: cancelOnError ?? parent.cancelOnError,
    seed: seed,
    retention: retention,
  );
}