runTask<T> method

Future<T?> runTask<T>(
  1. Future<T> task(), {
  2. String? id,
  3. TaskPriority priority = TaskPriority.normal,
  4. int retries = 0,
  5. Duration? retryDelay,
  6. bool useExponentialBackoff = true,
  7. double weight = 1.0,
  8. void onError(
    1. Object error,
    2. StackTrace stackTrace
    )?,
  9. TaskCachePolicy<T>? cachePolicy,
})

Executes a task and automatically tracks its status in tasks.

  • task: The async function to execute.
  • id: A unique ID for the task (optional, generated if null).
  • priority: Task priority.
  • retries: Number of retry attempts.
  • retryDelay: Delay between retries.
  • useExponentialBackoff: Exponential backoff strategy.
  • weight: The weight of this task in totalProgress calculation (default 1.0).
  • onError: Custom error handler.
  • cachePolicy: If provided, the task result will be cached and reused until it expires.

Implementation

Future<T?> runTask<T>(
  Future<T> Function() task, {
  String? id,
  TaskPriority priority = TaskPriority.normal,
  int retries = 0,
  Duration? retryDelay,
  bool useExponentialBackoff = true,
  double weight = 1.0,
  void Function(Object error, StackTrace stackTrace)? onError,
  TaskCachePolicy<T>? cachePolicy,
}) async {
  final taskId = id ?? _generateTaskId();

  if (cachePolicy != null) {
    final cacheKey = cachePolicy.key ?? taskId;
    final cachedJson = await taskCacheProvider.read(cacheKey);

    if (cachedJson != null) {
      final expiresAt =
          DateTime.fromMillisecondsSinceEpoch(cachedJson['expiresAt'] as int);
      if (DateTime.now().isBefore(expiresAt)) {
        final data = cachedJson['data'] as Map<String, dynamic>;
        try {
          final result = cachePolicy.fromJson(data);
          // Update reactive state for cache hit
          tasks[taskId] = LxSuccess<T>(result);
          _scheduleCleanup(taskId);
          return result;
        } catch (e) {
          // Deserialization fail
          await taskCacheProvider.delete(cacheKey);
        }
      } else {
        await taskCacheProvider.delete(cacheKey);
      }
    }
  }

  // Check for duplicate task ID
  if (tasks.containsKey(taskId) && tasks[taskId] is LxWaiting) {
    throw StateError('Task with id "$taskId" is already running. '
        'Use a unique ID or cancel the existing task first.');
  }

  _cleanupTimers[taskId]?.cancel();
  _cleanupTimers.remove(taskId);

  // Prune history
  if (tasks.length >= maxTaskHistory && !tasks.containsKey(taskId)) {
    // Find a completed/errored task to remove instead of just picking first
    final keyToRemove = tasks.keys.firstWhere(
      (k) => tasks[k] is! LxWaiting,
      orElse: () => '',
    );
    if (keyToRemove.isNotEmpty) {
      clearTask(keyToRemove);
    }
  }

  // Initialize status
  tasks[taskId] = LxWaiting<dynamic>();
  taskWeights[taskId] = weight;
  taskProgress[taskId] = 0.0;

  // Wrap execution to update UI state when it actually Runs
  Future<T> wrappedTask() async {
    try {
      final result = await task();
      tasks[taskId] = LxSuccess<T>(result);
      _scheduleCleanup(taskId);
      return result;
    } catch (e) {
      // Error handling is complex because the task engine catches it.
      // But we want to ensure cleanup hooks run if the engine doesn't invoke onError immediately?
      // Actually, engine catches it. The `onError` callback in `schedule` below handles it.
      rethrow;
    }
  }

  final result = await _reactiveTasksEngine.schedule(
    task: wrappedTask,
    id: taskId,
    priority: priority,
    retries: retries,
    retryDelay: retryDelay,
    useExponentialBackoff: useExponentialBackoff,
    onError: (e, s) {
      // This callback is invoked by engine only on FINAL failure
      tasks[taskId] = LxError<Object>(e, s, tasks[taskId]?.lastValue);
      final handler = onError ?? onTaskError;
      handler?.call(e, s);
      _scheduleCleanup(taskId);
    },
  );

  if (cachePolicy != null && result != null) {
    final cacheKey = cachePolicy.key ?? taskId;
    await taskCacheProvider.write(cacheKey, {
      'expiresAt': DateTime.now().add(cachePolicy.ttl).millisecondsSinceEpoch,
      'data': cachePolicy.toJson(result),
    });
  }

  return result;
}