runTask<T> method
Future<T?>
runTask<T>(
- Future<
T> task(), { - String? id,
- TaskPriority priority = TaskPriority.normal,
- int retries = 0,
- Duration? retryDelay,
- bool useExponentialBackoff = true,
- double weight = 1.0,
- void onError(
- Object error,
- StackTrace stackTrace
- TaskCachePolicy<
T> ? cachePolicy,
Executes a task and automatically tracks its status in tasks.
task: The async function to execute.id: A unique ID for the task (optional, generated if null).priority: Task priority.retries: Number of retry attempts.retryDelay: Delay between retries.useExponentialBackoff: Exponential backoff strategy.weight: The weight of this task in totalProgress calculation (default 1.0).onError: Custom error handler.cachePolicy: If provided, the task result will be cached and reused until it expires.
Implementation
Future<T?> runTask<T>(
Future<T> Function() task, {
String? id,
TaskPriority priority = TaskPriority.normal,
int retries = 0,
Duration? retryDelay,
bool useExponentialBackoff = true,
double weight = 1.0,
void Function(Object error, StackTrace stackTrace)? onError,
TaskCachePolicy<T>? cachePolicy,
}) async {
final taskId = id ?? _generateTaskId();
if (cachePolicy != null) {
final cacheKey = cachePolicy.key ?? taskId;
final cachedJson = await taskCacheProvider.read(cacheKey);
if (cachedJson != null) {
final expiresAt =
DateTime.fromMillisecondsSinceEpoch(cachedJson['expiresAt'] as int);
if (DateTime.now().isBefore(expiresAt)) {
final data = cachedJson['data'] as Map<String, dynamic>;
try {
final result = cachePolicy.fromJson(data);
// Update reactive state for cache hit
tasks[taskId] = LxSuccess<T>(result);
_scheduleCleanup(taskId);
return result;
} catch (e) {
// Deserialization fail
await taskCacheProvider.delete(cacheKey);
}
} else {
await taskCacheProvider.delete(cacheKey);
}
}
}
// Check for duplicate task ID
if (tasks.containsKey(taskId) && tasks[taskId] is LxWaiting) {
throw StateError('Task with id "$taskId" is already running. '
'Use a unique ID or cancel the existing task first.');
}
_cleanupTimers[taskId]?.cancel();
_cleanupTimers.remove(taskId);
// Prune history
if (tasks.length >= maxTaskHistory && !tasks.containsKey(taskId)) {
// Find a completed/errored task to remove instead of just picking first
final keyToRemove = tasks.keys.firstWhere(
(k) => tasks[k] is! LxWaiting,
orElse: () => '',
);
if (keyToRemove.isNotEmpty) {
clearTask(keyToRemove);
}
}
// Initialize status
tasks[taskId] = LxWaiting<dynamic>();
taskWeights[taskId] = weight;
taskProgress[taskId] = 0.0;
// Wrap execution to update UI state when it actually Runs
Future<T> wrappedTask() async {
try {
final result = await task();
tasks[taskId] = LxSuccess<T>(result);
_scheduleCleanup(taskId);
return result;
} catch (e) {
// Error handling is complex because the task engine catches it.
// But we want to ensure cleanup hooks run if the engine doesn't invoke onError immediately?
// Actually, engine catches it. The `onError` callback in `schedule` below handles it.
rethrow;
}
}
final result = await _reactiveTasksEngine.schedule(
task: wrappedTask,
id: taskId,
priority: priority,
retries: retries,
retryDelay: retryDelay,
useExponentialBackoff: useExponentialBackoff,
onError: (e, s) {
// This callback is invoked by engine only on FINAL failure
tasks[taskId] = LxError<Object>(e, s, tasks[taskId]?.lastValue);
final handler = onError ?? onTaskError;
handler?.call(e, s);
_scheduleCleanup(taskId);
},
);
if (cachePolicy != null && result != null) {
final cacheKey = cachePolicy.key ?? taskId;
await taskCacheProvider.write(cacheKey, {
'expiresAt': DateTime.now().add(cachePolicy.ttl).millisecondsSinceEpoch,
'data': cachePolicy.toJson(result),
});
}
return result;
}