executeTasks method

  1. @override
TResolvableOption<T> executeTasks()
override

Executes all tasks in the queue concurrently using a custom wait utility.

The tasks are started in parallel, and this method returns a Resolvable that will complete once all tasks have finished.

Implementation

@override
TResolvableOption<T> executeTasks() {
  _executionCount = tasks.length;
  _executionIndex = 0;
  final itemFactories = tasks.map(
    (task) => () => task
        .handler(Ok(None<T>()))
        .withMinDuration(_minTaskDuration ?? task.minTaskDuration)
        .then((e) {
          _executionIndex++;
          return _onTaskCompleted?.call(task, executionProgress) ??
              syncUnit();
        })
        .flatten()
        .value,
  );
  _internalIsExecuting = true;
  return Resolvable(
    () => waitF<Option<T>>(
      itemFactories,
      (_) => const None(),
      eagerError: _eagerError,
      onError: (error, stackTrace) {
        // Preserve a thrown Err verbatim — only wrap non-Err errors. A task
        // handler that threw `Err('alarm', statusCode: 503)` must reach the
        // user's `onError` with its statusCode intact.
        final err =
            error is Err ? error : Err<Object>(error, stackTrace: stackTrace);
        return _onError?.call(err).value;
      },
    ),
  ).whenComplete((e) {
    _internalIsExecuting = false;
    return e;
  });
}