fromStreamEither<T> static method

OffsetIterator<Either<dynamic, T>> fromStreamEither<T>(
  1. Stream<T> stream, {
  2. int retention = 0,
  3. SeedCallback<Either<dynamic, T>>? seed,
  4. String name = 'OffsetIterator.fromStreamEither',
})

Create an OffsetIterator from the provided Stream. If a ValueStream with a seed is given, it will populate the iterator's seed value.

Items are wrapped in E.Either, so errors don't cancel the subscription.

Implementation

static OffsetIterator<E.Either<dynamic, T>> fromStreamEither<T>(
  Stream<T> stream, {
  int retention = 0,
  SeedCallback<E.Either<dynamic, T>>? seed,
  String name = 'OffsetIterator.fromStreamEither',
}) {
  final valueStreamSeed = O
      .tryCatch(() => (stream as dynamic).valueOrNull as T?)
      .p(O.chainNullableK(identity));

  stream = valueStreamSeed.p(O.fold(
    () => stream,
    (_) => stream.skip(1),
  ));

  final eitherStream = stream
      .transform(StreamTransformer<T, E.Either<dynamic, T>>.fromHandlers(
    handleData: (data, sink) => sink.add(E.right(data)),
    handleError: (error, stack, sink) => sink.add(E.left(error)),
  ));

  return OffsetIterator(
    name: name,
    init: () => StreamIterator(eitherStream),
    process: (i) async {
      final iter = i as StreamIterator<E.Either<dynamic, T>>;
      final available = await iter.moveNext();

      if (!available) {
        return OffsetIteratorState(acc: iter, hasMore: false);
      }

      return OffsetIteratorState(
        acc: iter,
        chunk: [iter.current],
        hasMore: available,
      );
    },
    cleanup: (i) => (i as StreamIterator).cancel(),
    seed: seed ?? () => valueStreamSeed.p(O.map(E.right)),
    retention: retention,
  );
}