bg_orchestrator 1.0.4
bg_orchestrator: ^1.0.4 copied to clipboard
Production-grade background task orchestrator for Flutter with task chaining, middleware, timeouts, cron scheduling, batching, encryption, and persistent services.
bg_orchestrator #
A production-grade, cross-platform background task orchestrator for Flutter. Unifies Android WorkManager and iOS BGTaskScheduler behind a single, elegant Dart API.
Chain tasks. Retry with backoff. Monitor progress. Encrypt data. Limit concurrency. All with one API.
Why bg_orchestrator? #
Flutter developers currently juggle 3-4 fragile packages with no task chaining, progress reporting, or iOS parity. bg_orchestrator provides everything out of the box.
| Feature | bg_orchestrator | workmanager | flutter_background_service |
|---|---|---|---|
| Unified API | ✅ | ❌ | ❌ |
| Task Chaining | ✅ | ❌ | ❌ |
| Progress Reporting | ✅ | ❌ | ❌ |
| Middleware | ✅ | ❌ | ❌ |
| Timeouts | ✅ | ❌ | ⚠️ |
| History Logs | ✅ | ❌ | ❌ |
| Cron Scheduling | ✅ | ❌ | ❌ |
| Batching | ✅ | ❌ | ❌ |
| Concurrency Control | ✅ | ❌ | ❌ |
| Rate Limiting | ✅ | ❌ | ❌ |
| Priority Queues | ✅ | ❌ | ❌ |
| Encryption | ✅ | ❌ | ❌ |
| iOS Parity | ✅ | ⚠️ Limited | ❌ |
Quick Start #
Installation #
dependencies:
bg_orchestrator: ^1.0.4
iOS Setup #
Add to ios/Runner/Info.plist:
<key>BGTaskSchedulerPermittedIdentifiers</key>
<array>
<string>dev.taskflow.refresh</string>
<string>dev.taskflow.processing</string>
</array>
<key>UIBackgroundModes</key>
<array>
<string>fetch</string>
<string>processing</string>
</array>
Initialize #
void main() async {
WidgetsFlutterBinding.ensureInitialized();
// Register handlers
TaskFlow.registerHandler('syncData', (ctx) async {
final userId = ctx.input['userId'] as String;
await myApi.sync(userId);
return TaskResult.success(data: {'synced': true});
});
await TaskFlow.initialize();
runApp(MyApp());
}
Core Features #
1. Task Enqueueing #
Basic task:
final id = await TaskFlow.enqueue(
'syncData',
input: {'userId': '123'},
);
With timeout, dedup, concurrency, rate limit, priority queue:
final id = await TaskFlow.enqueue(
'syncData',
input: {'userId': '123'},
timeout: TaskTimeout.moderate, // 30s warn, 60s kill
dedupPolicy: DedupPolicy.byInput( // Don't duplicate in 5 min
ttl: Duration(minutes: 5),
),
concurrency: ConcurrencyControl.limited, // Max 3 concurrent
rateLimit: RateLimit.moderate, // 10 executions/min
queue: TaskQueue.high, // 10x priority weight
encryption: TaskEncryption.aes256, // Encrypt sensitive data
);
2. Task Handlers #
Simple handler:
TaskFlow.registerHandler('syncData', (ctx) async {
final userId = ctx.input['userId'] as String;
await myApi.sync(userId);
return TaskResult.success(data: {'synced': true});
});
With progress reporting:
TaskFlow.registerHandler('downloadFile', (ctx) async {
final url = ctx.input['url'] as String;
final file = File('/tmp/download');
int bytesDownloaded = 0;
int totalBytes = 1000000;
stream.listen((chunk) {
bytesDownloaded += chunk.length;
ctx.reportProgress(bytesDownloaded / totalBytes);
});
return TaskResult.success(data: {'path': file.path});
});
3. Task Chaining #
Sequential execution with data passing:
await TaskFlow.chain('processPayment')
.then('validatePayment')
.then('processPayment')
.then('sendConfirmation')
.enqueue(input: {'amount': 500.00});
Parallel steps:
await TaskFlow.chain('multiSync')
.then('fetchUsers')
.thenAll(['syncContacts', 'syncCalendar', 'syncNotes']) // 3 in parallel
.then('cleanup')
.enqueue();
With constraints and retry:
await TaskFlow.chain('securePipeline')
.then('step1')
.then('step2')
.withConstraints(TaskConstraints(network: NetworkConstraint.connected))
.withRetry(RetryPolicy.exponential(maxAttempts: 3, initialDelay: Duration(seconds: 5)))
.enqueue();
4. Advanced Scheduling #
Periodic task (15+ minutes on Android):
await TaskFlow.schedule(
'syncData',
interval: Duration(hours: 1),
constraints: TaskConstraints(network: NetworkConstraint.unmetered),
);
Cron expression (5-field syntax):
await TaskFlow.schedule(
'dailyReport',
cron: CronSchedule.daily(hour: 9), // Every day at 9am
);
// Or custom cron
await TaskFlow.schedule(
'weeklyCleanup',
cron: CronSchedule.cron('0 2 * * MON'), // Every Monday at 2am
);
Time window (restrict to specific hours):
await TaskFlow.schedule(
'sync',
interval: Duration(hours: 1),
window: TimeWindow.offPeak, // Only 2am-5am
);
// Or custom window
await TaskFlow.schedule(
'upload',
interval: Duration(minutes: 30),
window: TimeWindow(
startHour: 9,
endHour: 17,
daysOfWeek: [1, 2, 3, 4, 5], // Weekdays only
),
);
5. Monitoring Progress #
Monitor by execution ID:
TaskFlow.monitorExecution(id).listen((status) {
switch (status) {
case TaskQueued():
print('Waiting to run');
case TaskRunning(:final progress):
print('${(progress * 100).toInt()}% complete');
case TaskSucceeded(:final output):
print('Done! ${output}');
case TaskFailed(:final error):
print('Failed: $error');
case TaskRetrying(:final attempt):
print('Retrying attempt $attempt...');
case TaskCancelled():
print('Cancelled');
}
});
In UI with StreamBuilder:
StreamBuilder<TaskStatus>(
stream: TaskFlow.monitorExecution(executionId),
builder: (context, snapshot) {
if (!snapshot.hasData) return CircularProgressIndicator();
final status = snapshot.data!;
return switch (status) {
TaskRunning(:final progress) => LinearProgressIndicator(value: progress),
TaskSucceeded() => Text('✓ Complete'),
TaskFailed(:final error) => Text('✗ $error'),
_ => Text('Status: $status'),
};
},
)
6. Middleware & Interceptors #
Add logging, auth refresh, analytics to every task:
class LoggingMiddleware extends TaskMiddleware {
@override
Future<TaskResult> execute(String taskName, TaskContext ctx,
Future<TaskResult> Function() next) async {
print('📋 $taskName started');
final result = await next();
print('✅ $taskName completed: ${result.status}');
return result;
}
}
TaskFlow.use(LoggingMiddleware());
Chain multiple middleware:
TaskFlow.use(LoggingMiddleware());
TaskFlow.use(AuthRefreshMiddleware());
TaskFlow.use(AnalyticsMiddleware());
7. Timeouts (Soft & Hard) #
Warn at 30s, kill at 60s:
const timeout = TaskTimeout(
soft: Duration(seconds: 30), // Warning callback
hard: Duration(minutes: 1), // Force termination
onSoftTimeout: (executionId) => print('⚠️ Task slow!'),
);
await TaskFlow.enqueue('longTask', timeout: timeout);
Use presets:
TaskTimeout.quick // 45s warn, 60s kill (API calls)
TaskTimeout.moderate // 4min warn, 5min kill (default)
TaskTimeout.extended // 25min warn, 30min kill (long ops)
8. Execution History & Debugging #
Get task execution history:
final history = await TaskFlow.getHistory('syncData', limit: 20);
for (final entry in history) {
print('${entry.taskName}: ${entry.status} in ${entry.durationMs}ms');
if (entry.error != null) print(' Error: ${entry.error}');
}
Query by status or date:
final failed = await TaskFlow.getHistory('syncData', status: 'failed');
final recent = await TaskFlow.getHistory(
'syncData',
sinceDate: DateTime.now().subtract(Duration(days: 1)),
);
9. Lifecycle Hooks #
Global callbacks for Sentry/Crashlytics integration:
TaskFlow.onTaskStart((entry) {
metrics.recordTaskStart(entry.taskName);
});
TaskFlow.onTaskComplete((entry) {
metrics.recordTaskComplete(entry.taskName, entry.durationMs);
});
TaskFlow.onTaskFailed((entry) {
Sentry.captureException(Exception(entry.error));
});
TaskFlow.onChainComplete((chainId, status) {
print('Chain $chainId: $status');
});
10. Batching Operations #
Enqueue 100+ items as one trackable unit:
final batch = await TaskFlow.batch(
'uploadPhotos',
items: [photo1, photo2, photo3, ...photo100],
handler: (ctx, photo) async {
await uploadToServer(photo);
return TaskResult.success();
},
);
batch
.then((results) => print('All ${results.length} uploaded!'))
.catch((error) => print('Upload failed: $error'))
.finally_(() => print('Done'));
11. Concurrency Control #
Limit parallel executions:
// Max 3 uploads at once
await TaskFlow.enqueue(
'uploadFile',
concurrency: ConcurrencyControl.limited,
);
// Or strategies: FIFO, LIFO, random, byPriority
await TaskFlow.enqueue(
'uploadFile',
concurrency: ConcurrencyControl(
maxConcurrent: 5,
strategy: ConcurrencyStrategy.byPriority,
),
);
12. Rate Limiting #
Throttle background jobs:
// Max 10 API calls per minute
const limit = RateLimit(
maxExecutions: 10,
window: Duration(minutes: 1),
);
await TaskFlow.enqueue('apiCall', rateLimit: limit);
Use presets:
RateLimit.conservative // 5/min
RateLimit.moderate // 10/min
RateLimit.aggressive // 50/min
RateLimit.hourly // 100/hour
13. Priority Queues #
Ensure payments complete before analytics:
await TaskFlow.enqueue('processPayment', queue: TaskQueue.critical); // 100x
await TaskFlow.enqueue('trackAnalytics', queue: TaskQueue.low); // 0.1x
Queue weights:
critical: 100x (payments, critical ops)high: 10x (uploads, downloads, user data)default: 1x (routine syncs, background updates)low: 0.1x (analytics, logging, batch ops)
14. Task Deduplication #
Prevent duplicate enqueues:
// Don't enqueue syncUser:123 if already queued within 5 minutes
await TaskFlow.enqueue(
'syncUser',
input: {'userId': '123'},
dedupPolicy: DedupPolicy.byInput(ttl: Duration(minutes: 5)),
);
// Only deduplicate by userId field (ignore 'force' field)
await TaskFlow.enqueue(
'syncUser',
input: {'userId': '123', 'force': true},
dedupPolicy: DedupPolicy.byFields(
ttl: Duration(minutes: 5),
fields: ['userId'],
),
);
15. Encryption #
Encrypt sensitive task data:
await TaskFlow.enqueue(
'processPayment',
input: {'cardNumber': '4532-1111-2222-3333'},
encryption: TaskEncryption.aes256, // AES-256-GCM at rest
);
Keys stored securely:
- Android: Keystore
- iOS: Keychain
16. Persistent Services #
Always-on foreground services (GPS, WebSocket, BLE):
await TaskFlow.startService(
'liveTracking',
notificationTitle: '🚗 Ride in Progress',
notificationBody: 'Your location is being shared',
updateInterval: Duration(seconds: 10),
);
// Simulate location updates
await TaskFlow.sendToService('tracking', {
'command': 'updateLocation',
'lat': 12.9716,
'lng': 77.5946,
});
// Listen for service events
TaskFlow.onServiceEvent('tracking').listen((event) {
print('Service update: $event');
});
// Stop service
await TaskFlow.stopService('liveTracking');
Retry Policies #
Exponential backoff (recommended):
RetryPolicy.exponential(
maxAttempts: 5,
initialDelay: Duration(seconds: 10),
maxDelay: Duration(hours: 1),
multiplier: 2.0,
jitter: true,
)
Linear backoff:
RetryPolicy.linear(
maxAttempts: 3,
delay: Duration(seconds: 30),
jitter: true,
)
Custom backoff:
RetryPolicy.custom(
maxAttempts: 4,
delayForAttempt: (attempt) {
if (attempt == 1) return Duration(seconds: 5);
if (attempt == 2) return Duration(seconds: 15);
if (attempt == 3) return Duration(seconds: 60);
return Duration(minutes: 5);
},
)
Constraints #
Control when tasks execute:
TaskConstraints(
network: NetworkConstraint.connected, // .unmetered, .none
batteryNotLow: true,
requiresCharging: false,
deviceIdle: false,
)
Cancellation #
// Cancel by name (all executions)
await TaskFlow.cancel('syncData');
// Cancel specific execution
await TaskFlow.cancelExecution(executionId);
// Cancel entire chain
await TaskFlow.cancelChain(chainId);
// Cancel by tag
await TaskFlow.cancelByTag('sync');
// Cancel everything
await TaskFlow.cancelAll();
Querying #
// Get current status
final status = await TaskFlow.getStatus('syncData');
// Get all tasks
final allTasks = await TaskFlow.getAllTasks();
// Get tasks by tag
final syncTasks = await TaskFlow.getTasksByTag('sync');
Complete Example #
See example/lib/main.dart for a comprehensive app demonstrating:
- All execution modes (deferrable, periodic, expedited, persistent)
- Task chaining with sequential and parallel steps
- Real-time progress monitoring
- Activity logging and results display
- All v1.0-v2.0 features in action
Platform Support #
- Android — WorkManager 2.10.1+, minSdk 24
- iOS — BGTaskScheduler, iOS 13.0+
- Web — Not supported (different background APIs)
Note: The Dart API is fully implemented. Native platform implementations (Kotlin/Swift) for advanced features are in progress. Core task scheduling works on physical devices.
Example app demonstrates the complete API surface for reference. For production use on real devices, implement the corresponding native methods in your platform channels.
API Reference #
| Method | Purpose |
|---|---|
TaskFlow.initialize() |
Initialize (call once in main) |
TaskFlow.registerHandler() |
Register task handler |
TaskFlow.enqueue() |
Enqueue one-off task |
TaskFlow.chain() |
Start task chain builder |
TaskFlow.schedule() |
Schedule periodic task |
TaskFlow.reschedule() |
Update schedule interval |
TaskFlow.unschedule() |
Stop periodic task |
TaskFlow.monitor() |
Monitor by task name |
TaskFlow.monitorExecution() |
Monitor by execution ID |
TaskFlow.cancel() |
Cancel by name |
TaskFlow.cancelExecution() |
Cancel specific execution |
TaskFlow.cancelChain() |
Cancel entire chain |
TaskFlow.cancelByTag() |
Cancel by tag |
TaskFlow.cancelAll() |
Cancel all tasks |
TaskFlow.getStatus() |
Get current status |
TaskFlow.getAllTasks() |
Get all tasks |
TaskFlow.getTasksByTag() |
Get tasks by tag |
TaskFlow.getHistory() |
Get execution history |
TaskFlow.startService() |
Start persistent service |
TaskFlow.stopService() |
Stop persistent service |
TaskFlow.sendToService() |
Send command to service |
TaskFlow.onServiceEvent() |
Listen for service events |
Limitations & Notes #
Android #
- Minimum periodic interval: 15 minutes (WorkManager)
- Task inputs/outputs limited to 10 KB (WorkManager)
- Chaining is native via WorkManager's WorkContinuation
iOS #
- Task chaining is simulated using UserDefaults (best-effort)
- System controls actual execution timing
- Background execution limited; tasks may be interrupted
General #
- Task input/output must be JSON-serializable (
Map<String, dynamic>) - Handlers must be top-level (required for background isolates)
- Dispatcher function requires
@pragma('vm:entry-point')annotation
Contributing #
Issues and PRs welcome! bg_orchestrator aims to be the simplest, most reliable background task solution for Flutter.
License #
MIT — See LICENSE file