fromStreamEither<T> static method
OffsetIterator<Either<dynamic, T> >
fromStreamEither<T>(
- Stream<
T> stream, { - int retention = 0,
- SeedCallback<
Either< ? seed,dynamic, T> > - String name = 'OffsetIterator.fromStreamEither',
Create an OffsetIterator
from the provided Stream
.
If a ValueStream
with a seed is given, it will populate the iterator's
seed value.
Items are wrapped in E.Either
, so errors don't cancel the subscription.
Implementation
static OffsetIterator<E.Either<dynamic, T>> fromStreamEither<T>(
Stream<T> stream, {
int retention = 0,
SeedCallback<E.Either<dynamic, T>>? seed,
String name = 'OffsetIterator.fromStreamEither',
}) {
final valueStreamSeed = O
.tryCatch(() => (stream as dynamic).valueOrNull as T?)
.p(O.chainNullableK(identity));
stream = valueStreamSeed.p(O.fold(
() => stream,
(_) => stream.skip(1),
));
final eitherStream = stream
.transform(StreamTransformer<T, E.Either<dynamic, T>>.fromHandlers(
handleData: (data, sink) => sink.add(E.right(data)),
handleError: (error, stack, sink) => sink.add(E.left(error)),
));
return OffsetIterator(
name: name,
init: () => StreamIterator(eitherStream),
process: (i) async {
final iter = i as StreamIterator<E.Either<dynamic, T>>;
final available = await iter.moveNext();
if (!available) {
return OffsetIteratorState(acc: iter, hasMore: false);
}
return OffsetIteratorState(
acc: iter,
chunk: [iter.current],
hasMore: available,
);
},
cleanup: (i) => (i as StreamIterator).cancel(),
seed: seed ?? () => valueStreamSeed.p(O.map(E.right)),
retention: retention,
);
}