run method

Stream<double> run({
  1. IFastErrorReporter? errorReporter,
})

Implementation

Stream<double> run({
  IFastErrorReporter? errorReporter,
}) {
  if (!_isRunning) {
    final rProgresStep = Decimal.one / Decimal.fromInt(jobs.length);
    final dProgresStep = rProgresStep.toDecimal(scaleOnInfinitePrecision: 32);
    var progress = Decimal.zero;
    var hasError = false;
    _isRunning = true;

    _runner = Stream.fromIterable(jobs)
        .takeWhile((FastJob job) => !hasError)
        .asyncExpand((FastJob job) {
          final completer = Completer<bool>();

          WidgetsBinding.instance.scheduleFrameCallback((_) {
            runZonedGuarded(() async {
              final response = await job.run(
                context,
                errorReporter: errorReporter,
              );

              completer.complete(response);
            }, (error, stackTrace) => completer.completeError(error));
          });

          return Stream.fromFuture(completer.future);
        })
        .map((_) {
          progress += dProgresStep;

          return progress.toDouble();
        })
        .handleError((error) {
          hasError = true;

          if (error is FastJobError) {
            errorReporter?.recordError(
              error.source,
              error.stackTrace,
              reason: error.debugLabel,
            );
          } else if (error is BlocError) {
            errorReporter?.recordError(
              error.source,
              error.stackTrace,
              reason: error.message,
            );
          } else if (error is Error) {
            errorReporter?.recordError(
              error,
              error.stackTrace ?? StackTrace.current,
            );
          } else {
            errorReporter?.recordError(
              error,
              StackTrace.current,
            );
          }

          throw error as Object;
        })
        .doOnDone(() => _isRunning = false)
        .doOnCancel(() => _isRunning = false);
  }

  return _runner;
}