philiprehberger_sync_engine 0.2.0 copy "philiprehberger_sync_engine: ^0.2.0" to clipboard
philiprehberger_sync_engine: ^0.2.0 copied to clipboard

Offline-first data sync with conflict resolution, retry queues, and local caching

philiprehberger_sync_engine #

Tests pub package Last updated

Offline-first data sync with conflict resolution, retry queues, and local caching

Requirements #

  • Dart >= 3.5

Installation #

Add to your pubspec.yaml:

dependencies:
  philiprehberger_sync_engine: ^0.2.0

Then run:

dart pub get

Usage #

import 'package:philiprehberger_sync_engine/sync_engine.dart';

final store = LocalStore();
final resolver = ConflictResolver(strategy: ConflictStrategy.latestWins);
final engine = SyncEngine(store: store, resolver: resolver);

Adding Records #

store.put(SyncRecord(
  id: 'user-1',
  data: {'name': 'Alice', 'email': 'alice@example.com'},
  updatedAt: DateTime.now(),
));

Running a Sync #

final result = await engine.sync(
  push: (records) async {
    // Send records to your API, return ids of successfully pushed records
    final response = await api.push(records);
    return response.successIds;
  },
  pull: () async {
    // Fetch records from your API
    return await api.fetchAll();
  },
);

print('Pushed: ${result.pushed}, Pulled: ${result.pulled}');

Conflict Resolution #

// Remote always wins
final resolver = ConflictResolver(strategy: ConflictStrategy.remoteWins);

// Local always wins
final resolver = ConflictResolver(strategy: ConflictStrategy.localWins);

// Latest timestamp wins
final resolver = ConflictResolver(strategy: ConflictStrategy.latestWins);

// Custom logic
final resolver = ConflictResolver(
  strategy: ConflictStrategy.custom,
  customResolver: (local, remote) => remote.version > local.version ? remote : local,
);

Progress Tracking #

await engine.sync(
  push: pushFn,
  pull: pullFn,
  onProgress: (completed, total) {
    print('$completed / $total');
  },
);

Tagging Records #

store.put(SyncRecord(
  id: 'user-1',
  data: {'name': 'Alice'},
  updatedAt: DateTime.now(),
  tags: {'user', 'priority'},
));

// Query by tag
final userRecords = store.queryByTag('user');

// Update tags on an existing record
final updated = record.withTags({'user', 'priority', 'reviewed'});
store.put(updated);

Selective Sync #

// Only push records tagged with 'priority'
final result = await engine.syncWhere(
  (record) => record.tags.contains('priority'),
  push: (records) async {
    final response = await api.push(records);
    return response.successIds;
  },
  pull: () async => await api.fetchAll(),
);

Exponential Backoff #

final retryQueue = RetryQueue(
  maxAttempts: 5,
  backoffBase: Duration(seconds: 1),
  backoffMultiplier: 2.0,
);

// Calculate delay for a given attempt
final delay = retryQueue.nextDelay(2); // 4 seconds (1 * 2^2)

Sync Metadata #

await engine.sync(push: pushFn, pull: pullFn);

final meta = engine.metadata;
print('Last sync: ${meta.lastSyncAt}');
print('Duration: ${meta.lastDuration}');
print('Total pushes: ${meta.totalPushes}');
print('Total pulls: ${meta.totalPulls}');
print('Total conflicts: ${meta.totalConflicts}');

// Serialize for persistence
final json = meta.toJson();
final restored = SyncMetadata.fromJson(json);

Querying Records #

final active = store.query(where: (r) => r.status == SyncStatus.synced);
final stats = store.statistics();
print('Total: ${stats.total}, Synced: ${stats.synced}');

API #

Class Method / Property Description
SyncRecord SyncRecord(id:, data:, updatedAt:) Create a sync record
SyncRecord withStatus(status) Copy with new status
SyncRecord withTags(tags) Copy with new tags
SyncRecord incrementVersion() Copy with version + 1
SyncRecord tags Tags for categorizing the record
LocalStore put(record) Store a record
LocalStore putAll(records) Store multiple records
LocalStore get(id) Retrieve a record by id
LocalStore remove(id) Remove a record
LocalStore all() Get all records
LocalStore pending() Get pending and modified records
LocalStore markSynced(id) Mark record as synced
LocalStore markModified(id) Mark record as modified
LocalStore query(where:) Filter records with a predicate
LocalStore queryByTag(tag) Filter records by tag
LocalStore statistics() Get aggregate counts
LocalStore count Total record count
LocalStore clear() Remove all records
ConflictResolver resolve(local, remote) Resolve a conflict
ConflictResolver resolvedCount Number of conflicts resolved
RetryQueue enqueue(record) Add a record to retry
RetryQueue dequeueAll() Get and clear all retryable records
RetryQueue pending() View queued records
RetryQueue nextDelay(attempt) Calculate backoff delay for attempt
RetryQueue count Number of queued records
RetryQueue clear() Clear the queue
SyncEngine sync(push:, pull:, onProgress:) Run a sync cycle
SyncEngine syncWhere(predicate, push:, pull:, onProgress:) Selective sync by predicate
SyncEngine metadata Cumulative sync statistics
SyncEngine lastSyncResult Result of the last sync
SyncEngine isSyncing Whether a sync is in progress
SyncMetadata lastSyncAt, lastDuration Timing of last sync
SyncMetadata totalPushes, totalPulls, totalConflicts Cumulative counts
SyncMetadata copyWith(...) Copy with updated fields
SyncMetadata toJson() / fromJson(map) JSON serialization
SyncResult pushed, pulled, conflicts, retried Individual counts
SyncResult total Sum of all counts

Development #

dart pub get
dart analyze --fatal-infos
dart test

Support #

If you find this project useful:

Star the repo

🐛 Report issues

💡 Suggest features

❤️ Sponsor development

🌐 All Open Source Projects

💻 GitHub Profile

🔗 LinkedIn Profile

License #

MIT

2
likes
160
points
0
downloads

Documentation

API reference

Publisher

verified publisherphiliprehberger.com

Weekly Downloads

Offline-first data sync with conflict resolution, retry queues, and local caching

Homepage
Repository (GitHub)
View/report issues

Topics

#sync #offline #cache

License

MIT (license)

More

Packages that depend on philiprehberger_sync_engine