triggerJobRuns method

Future<int> triggerJobRuns()

Implementation

Future<int> triggerJobRuns() {
  if (_currentStream != null) {
    _logger.info('Already running jobs. Ignoring trigger.');
    return Future.value(0);
  }
  _logger.finest('Triggering Job Runs. ${_queue.length}');
  final completer = Completer<int>();
  var successfulJobs = 0;
//    final job = _queue.removeFirst();
  _currentStream = (() async* {
    final copyQueue = _queue.map((job) async {
      await job.runner(job).drain(null);
      return job;
    }).toList(growable: false);
    for (final job in copyQueue) {
      yield await job;
    }
  })()
      .listen((successJob) {
    _queue.remove(successJob);
    successfulJobs++;
    _logger.finest(
        'Success job. remaining: ${_queue.length} - completed: $successfulJobs');
  }, onDone: () {
    _logger.finest('All jobs done.');
    _errorCount = 0;
    _lastError = null;

    _currentStream = null;
    completer.complete(successfulJobs);
  }, onError: (Object error, StackTrace stackTrace) {
    _logger.warning('Error while executing job', error, stackTrace);
    _errorCount++;
    _lastError = DateTime.now();
    _currentStream!.cancel();
    _currentStream = null;
    completer.completeError(error, stackTrace);

    const errorWait = 10;
    final minWait =
        Duration(seconds: errorWait * (_errorCount * _errorCount + 1));
    if (_lastError!.difference(DateTime.now()).abs().compareTo(minWait) < 0) {
      _logger.finest('There was an error. waiting at least $minWait');
      if (_queue.length > maxQueueSize) {
        _logger.finest('clearing log buffer. ${_queue.length}');
        _queue.clear();
      }
    }
    return Future.value(null);
  });

  return completer.future;
}