triggerJobRuns method
Implementation
Future<int> triggerJobRuns() {
if (_currentStream != null) {
_logger.info('Already running jobs. Ignoring trigger.');
return Future.value(0);
}
_logger.finest('Triggering Job Runs. ${_queue.length}');
final completer = Completer<int>();
var successfulJobs = 0;
// final job = _queue.removeFirst();
_currentStream = (() async* {
final copyQueue = _queue.map((job) async {
await job.runner(job).drain(null);
return job;
}).toList(growable: false);
for (final job in copyQueue) {
yield await job;
}
})()
.listen((successJob) {
_queue.remove(successJob);
successfulJobs++;
_logger.finest(
'Success job. remaining: ${_queue.length} - completed: $successfulJobs');
}, onDone: () {
_logger.finest('All jobs done.');
_errorCount = 0;
_lastError = null;
_currentStream = null;
completer.complete(successfulJobs);
}, onError: (Object error, StackTrace stackTrace) {
_logger.warning('Error while executing job', error, stackTrace);
_errorCount++;
_lastError = DateTime.now();
_currentStream!.cancel();
_currentStream = null;
completer.completeError(error, stackTrace);
const errorWait = 10;
final minWait =
Duration(seconds: errorWait * (_errorCount * _errorCount + 1));
if (_lastError!.difference(DateTime.now()).abs().compareTo(minWait) < 0) {
_logger.finest('There was an error. waiting at least $minWait');
if (_queue.length > maxQueueSize) {
_logger.finest('clearing log buffer. ${_queue.length}');
_queue.clear();
}
}
return Future.value(null);
});
return completer.future;
}