semaphoreMap<V> method
Unlike asyncMap: INPUT -> (await) MAPPER -> OUTPUT
This semaphoreMap does: INPUT -> (accumulate bufferSize) -> EXPAND Future.wait(buffer.map(MAPPER)) -> OUTPUT
The semaphoreMap is still completed in order, but the buffer size is used to limit the number of concurrent operations. Instead of 1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20 (commas where waiting) It will do 1234,5678,9101112,13141516,17181920 (commas where waiting)
Implementation
Stream<V> semaphoreMap<V>(int bufferSize, Future<V> Function(T t) mapper) =>
accumulateBy(
bufferSize,
(i) => 1,
).asyncMap((i) => Future.wait(i.map(mapper))).expand((i) => i);