triggerJobRuns method
Implementation
Future<int> triggerJobRuns() {
if (_currentStream != null) {
_logger.info('Already running jobs. Ignoring trigger.');
return Future.value(0);
}
_logger.finest('Triggering Job Runs. ${_queue.length}');
final completer = Completer<int>();
var successfulJobs = 0;
// final job = _queue.removeFirst();
_currentStream =
(() async* {
final copyQueue = _queue
.map((job) async {
await job.runner(job).drain(null);
return job;
})
.toList(growable: false);
for (final job in copyQueue) {
yield await job;
}
})().listen(
(successJob) {
_queue.remove(successJob);
successfulJobs++;
_logger.finest(
'Success job. remaining: ${_queue.length} - completed: $successfulJobs',
);
},
onDone: () {
_logger.finest('All jobs done.');
_errorCount = 0;
_lastError = null;
_currentStream = null;
completer.complete(successfulJobs);
},
onError: (Object error, StackTrace stackTrace) {
_logger.warning('Error while executing job', error, stackTrace);
_errorCount++;
_lastError = DateTime.now();
_currentStream!.cancel();
_currentStream = null;
completer.completeError(error, stackTrace);
const errorWait = 10;
final minWait = Duration(
seconds: errorWait * (_errorCount * _errorCount + 1),
);
if (_lastError!
.difference(DateTime.now())
.abs()
.compareTo(minWait) <
0) {
_logger.finest('There was an error. waiting at least $minWait');
if (_queue.length > maxQueueSize) {
_logger.finest('clearing log buffer. ${_queue.length}');
_queue.clear();
}
}
return Future.value(null);
},
);
return completer.future;
}