philiprehberger_sync_engine
Offline-first data sync with conflict resolution, retry queues, and local caching
Requirements
- Dart >= 3.5
Installation
Add to your pubspec.yaml:
dependencies:
philiprehberger_sync_engine: ^0.2.0
Then run:
dart pub get
Usage
import 'package:philiprehberger_sync_engine/sync_engine.dart';
final store = LocalStore();
final resolver = ConflictResolver(strategy: ConflictStrategy.latestWins);
final engine = SyncEngine(store: store, resolver: resolver);
Adding Records
store.put(SyncRecord(
id: 'user-1',
data: {'name': 'Alice', 'email': 'alice@example.com'},
updatedAt: DateTime.now(),
));
Running a Sync
final result = await engine.sync(
push: (records) async {
// Send records to your API, return ids of successfully pushed records
final response = await api.push(records);
return response.successIds;
},
pull: () async {
// Fetch records from your API
return await api.fetchAll();
},
);
print('Pushed: ${result.pushed}, Pulled: ${result.pulled}');
Conflict Resolution
// Remote always wins
final resolver = ConflictResolver(strategy: ConflictStrategy.remoteWins);
// Local always wins
final resolver = ConflictResolver(strategy: ConflictStrategy.localWins);
// Latest timestamp wins
final resolver = ConflictResolver(strategy: ConflictStrategy.latestWins);
// Custom logic
final resolver = ConflictResolver(
strategy: ConflictStrategy.custom,
customResolver: (local, remote) => remote.version > local.version ? remote : local,
);
Progress Tracking
await engine.sync(
push: pushFn,
pull: pullFn,
onProgress: (completed, total) {
print('$completed / $total');
},
);
Tagging Records
store.put(SyncRecord(
id: 'user-1',
data: {'name': 'Alice'},
updatedAt: DateTime.now(),
tags: {'user', 'priority'},
));
// Query by tag
final userRecords = store.queryByTag('user');
// Update tags on an existing record
final updated = record.withTags({'user', 'priority', 'reviewed'});
store.put(updated);
Selective Sync
// Only push records tagged with 'priority'
final result = await engine.syncWhere(
(record) => record.tags.contains('priority'),
push: (records) async {
final response = await api.push(records);
return response.successIds;
},
pull: () async => await api.fetchAll(),
);
Exponential Backoff
final retryQueue = RetryQueue(
maxAttempts: 5,
backoffBase: Duration(seconds: 1),
backoffMultiplier: 2.0,
);
// Calculate delay for a given attempt
final delay = retryQueue.nextDelay(2); // 4 seconds (1 * 2^2)
Sync Metadata
await engine.sync(push: pushFn, pull: pullFn);
final meta = engine.metadata;
print('Last sync: ${meta.lastSyncAt}');
print('Duration: ${meta.lastDuration}');
print('Total pushes: ${meta.totalPushes}');
print('Total pulls: ${meta.totalPulls}');
print('Total conflicts: ${meta.totalConflicts}');
// Serialize for persistence
final json = meta.toJson();
final restored = SyncMetadata.fromJson(json);
Querying Records
final active = store.query(where: (r) => r.status == SyncStatus.synced);
final stats = store.statistics();
print('Total: ${stats.total}, Synced: ${stats.synced}');
API
| Class | Method / Property | Description |
|---|---|---|
SyncRecord |
SyncRecord(id:, data:, updatedAt:) |
Create a sync record |
SyncRecord |
withStatus(status) |
Copy with new status |
SyncRecord |
withTags(tags) |
Copy with new tags |
SyncRecord |
incrementVersion() |
Copy with version + 1 |
SyncRecord |
tags |
Tags for categorizing the record |
LocalStore |
put(record) |
Store a record |
LocalStore |
putAll(records) |
Store multiple records |
LocalStore |
get(id) |
Retrieve a record by id |
LocalStore |
remove(id) |
Remove a record |
LocalStore |
all() |
Get all records |
LocalStore |
pending() |
Get pending and modified records |
LocalStore |
markSynced(id) |
Mark record as synced |
LocalStore |
markModified(id) |
Mark record as modified |
LocalStore |
query(where:) |
Filter records with a predicate |
LocalStore |
queryByTag(tag) |
Filter records by tag |
LocalStore |
statistics() |
Get aggregate counts |
LocalStore |
count |
Total record count |
LocalStore |
clear() |
Remove all records |
ConflictResolver |
resolve(local, remote) |
Resolve a conflict |
ConflictResolver |
resolvedCount |
Number of conflicts resolved |
RetryQueue |
enqueue(record) |
Add a record to retry |
RetryQueue |
dequeueAll() |
Get and clear all retryable records |
RetryQueue |
pending() |
View queued records |
RetryQueue |
nextDelay(attempt) |
Calculate backoff delay for attempt |
RetryQueue |
count |
Number of queued records |
RetryQueue |
clear() |
Clear the queue |
SyncEngine |
sync(push:, pull:, onProgress:) |
Run a sync cycle |
SyncEngine |
syncWhere(predicate, push:, pull:, onProgress:) |
Selective sync by predicate |
SyncEngine |
metadata |
Cumulative sync statistics |
SyncEngine |
lastSyncResult |
Result of the last sync |
SyncEngine |
isSyncing |
Whether a sync is in progress |
SyncMetadata |
lastSyncAt, lastDuration |
Timing of last sync |
SyncMetadata |
totalPushes, totalPulls, totalConflicts |
Cumulative counts |
SyncMetadata |
copyWith(...) |
Copy with updated fields |
SyncMetadata |
toJson() / fromJson(map) |
JSON serialization |
SyncResult |
pushed, pulled, conflicts, retried |
Individual counts |
SyncResult |
total |
Sum of all counts |
Development
dart pub get
dart analyze --fatal-infos
dart test
Support
If you find this project useful:
License
Libraries
- philiprehberger_sync_engine
- sync_engine
- Offline-first data sync with conflict resolution, retry queues, and local caching