flatMapBatches<R> method

Stream<List<R>> flatMapBatches<R>(
  1. Stream<R> transform(
    1. T value
    ),
  2. int size
)

Convert each element to a Stream, then listen to at most size Stream at a time and outputs their results in size-sized batches, while maintaining order within each batch — subsequent batches of Streams are only listened to when the batch before it successfully completes. Errors will be forwarded downstream.

Marble

input     : --a---b---c---x---d-------e--|
transform : a -> a| (Stream.value)
size      : 3
output    : --------[a,b,c]---x----------[d,e]--|

NOTE: x is error event

Implementation

Stream<List<R>> flatMapBatches<R>(
  Stream<R> Function(T value) transform,
  int size,
) {
  Stream<List<R>> convert(List<T> streams) =>
      Rx.zipList(streams.map(transform).toList(growable: false));

  return bufferCount(size).asyncExpand(convert);
}