push method

  1. @override
Future<void> push(
  1. QueueJob job, {
  2. Duration? delay,
})
override

Pushes a QueueJob to the queue with optional delay.

The delay allows deferring the job execution.

Example:

await queueDriver.push(SendEmailJob(), delay: Duration(seconds: 30));

Implementation

@override
Future<void> push(QueueJob job, {Duration? delay}) async {
  final jobType = job.runtimeType.toString();
  _validateJobType(jobType);

  final payload = _asMap(job.toJson()) ?? <String, dynamic>{};
  if (!_isSafePayload(payload)) {
    throw QueueException('Rejected unsafe job payload for type $jobType');
  }

  final context = createJobContext(job, delay: delay);
  final command = await _getConnection();

  final jobData = {...context.toJson(), 'payload': payload};

  // Track metrics
  if (metrics != null) {
    metrics!.jobQueued(job.runtimeType.toString());
  }

  try {
    if (delay != null && delay > Duration.zero) {
      // Use sorted set for delayed jobs
      final score = DateTime.now().add(delay).millisecondsSinceEpoch;
      await command.send_object([
        'ZADD',
        _delayedQueue,
        score.toString(),
        jsonEncode(jobData),
      ]);
    } else {
      // Push to main queue
      await command.send_object(['LPUSH', _mainQueue, jsonEncode(jobData)]);
    }

    // Update queue depth metrics
    if (metrics != null) {
      final depth = await _getQueueDepth();
      metrics!.recordQueueDepth(depth);
    }
  } catch (e) {
    print('❌ Failed to push job to Redis: $e');
    rethrow;
  }
}