mapConcurrent<R> method
Executes action on each element with at most parallelism concurrent tasks.
Results are returned in completion order, not input order.
If any task throws, the returned future completes with that error. Any in-flight tasks continue running and their errors are handled internally to avoid unhandled exceptions.
Implementation
Future<List<R>> mapConcurrent<R>(
Future<R> Function(E item) action, {
int parallelism = 1,
}) async {
if (parallelism <= 0) throw ArgumentError('Parallelism must be positive');
final results = <R>[];
final active = <Future<void>>{};
final iterator = this.iterator;
try {
while (iterator.moveNext()) {
while (active.length >= parallelism) {
await Future.any(active);
}
final item = iterator.current;
late Future<void> task;
task = action(item).then((result) {
results.add(result);
}).whenComplete(() {
active.remove(task);
});
active.add(task);
}
await Future.wait(active);
} catch (_) {
for (final task in active) {
// ignore: unawaited_futures
task.catchError((_) {});
}
rethrow;
}
return results;
}