semaphoreMap<V> method

Stream<V> semaphoreMap<V>(
  1. int bufferSize,
  2. Future<V> mapper(
    1. T t
    )
)

Unlike asyncMap: INPUT -> (await) MAPPER -> OUTPUT

This semaphoreMap does: INPUT -> (accumulate bufferSize) -> EXPAND Future.wait(buffer.map(MAPPER)) -> OUTPUT

The semaphoreMap is still completed in order, but the buffer size is used to limit the number of concurrent operations. Instead of 1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20 (commas where waiting) It will do 1234,5678,9101112,13141516,17181920 (commas where waiting)

Implementation

Stream<V> semaphoreMap<V>(int bufferSize, Future<V> Function(T t) mapper) =>
    accumulateBy(
      bufferSize,
      (i) => 1,
    ).asyncMap((i) => Future.wait(i.map(mapper))).expand((i) => i);