processQueue method

Future<List<QueueProcessResult>> processQueue({
  1. void onProgress(
    1. int completed,
    2. int total
    )?,
})

Process all queued requests sequentially.

Returns a list of QueueProcessResults. Failed requests that still have remaining retries are re-enqueued at the back of the queue.

The optional onProgress callback is invoked after each request with the (completed, total) counts.

Implementation

Future<List<QueueProcessResult>> processQueue({
  void Function(int completed, int total)? onProgress,
}) async {
  if (_isProcessing) {
    _logger.warning('RequestQueue: processQueue called while already '
        'processing — skipping');
    return [];
  }

  _isProcessing = true;
  final results = <QueueProcessResult>[];
  final total = _store.length;

  _logger.info('RequestQueue: processing $total queued request(s)');

  var completed = 0;

  while (!_store.isEmpty) {
    final request = _store.dequeue()!;
    try {
      final data = await _executeWithRetry(request);
      final result = QueueProcessResult(
        request: request,
        isSuccess: true,
        data: data,
      );
      results.add(result);
      if (!_resultController.isClosed) _resultController.add(result);
      _logger.info('RequestQueue: ✓ "${request.id}" succeeded');
    } catch (e) {
      final result = QueueProcessResult(
        request: request,
        isSuccess: false,
        error: e,
      );
      results.add(result);
      if (!_resultController.isClosed) _resultController.add(result);
      _logger.warning('RequestQueue: ✗ "${request.id}" failed — $e');
    }

    completed++;
    onProgress?.call(completed, total);
  }

  _isProcessing = false;
  _logger.info(
    'RequestQueue: processing complete — '
    '${results.where((r) => r.isSuccess).length}/$total succeeded',
  );
  return results;
}