philiprehberger_sync_engine 0.4.0 copy "philiprehberger_sync_engine: ^0.4.0" to clipboard
philiprehberger_sync_engine: ^0.4.0 copied to clipboard

Offline-first data sync with conflict resolution, retry queues, and local caching

philiprehberger_sync_engine #

Tests pub package Last updated

Offline-first data sync with conflict resolution, retry queues, and local caching

Requirements #

  • Dart >= 3.6

Installation #

Add to your pubspec.yaml:

dependencies:
  philiprehberger_sync_engine: ^0.4.0

Then run:

dart pub get

Usage #

import 'package:philiprehberger_sync_engine/sync_engine.dart';

final store = LocalStore();
final resolver = ConflictResolver(strategy: ConflictStrategy.latestWins);
final engine = SyncEngine(store: store, resolver: resolver);

Adding Records #

store.put(SyncRecord(
  id: 'user-1',
  data: {'name': 'Alice', 'email': 'alice@example.com'},
  updatedAt: DateTime.now(),
));

Running a Sync #

final result = await engine.sync(
  push: (records) async {
    // Send records to your API, return ids of successfully pushed records
    final response = await api.push(records);
    return response.successIds;
  },
  pull: () async {
    // Fetch records from your API
    return await api.fetchAll();
  },
);

print('Pushed: ${result.pushed}, Pulled: ${result.pulled}');

Conflict Resolution #

// Remote always wins
final resolver = ConflictResolver(strategy: ConflictStrategy.remoteWins);

// Local always wins
final resolver = ConflictResolver(strategy: ConflictStrategy.localWins);

// Latest timestamp wins
final resolver = ConflictResolver(strategy: ConflictStrategy.latestWins);

// Custom logic
final resolver = ConflictResolver(
  strategy: ConflictStrategy.custom,
  customResolver: (local, remote) => remote.version > local.version ? remote : local,
);

Progress Tracking #

await engine.sync(
  push: pushFn,
  pull: pullFn,
  onProgress: (completed, total) {
    print('$completed / $total');
  },
);

Tagging Records #

store.put(SyncRecord(
  id: 'user-1',
  data: {'name': 'Alice'},
  updatedAt: DateTime.now(),
  tags: {'user', 'priority'},
));

// Query by tag
final userRecords = store.queryByTag('user');

// Update tags on an existing record
final updated = record.withTags({'user', 'priority', 'reviewed'});
store.put(updated);

Selective Sync #

// Only push records tagged with 'priority'
final result = await engine.syncWhere(
  (record) => record.tags.contains('priority'),
  push: (records) async {
    final response = await api.push(records);
    return response.successIds;
  },
  pull: () async => await api.fetchAll(),
);

Exponential Backoff #

final retryQueue = RetryQueue(
  maxAttempts: 5,
  backoffBase: Duration(seconds: 1),
  backoffMultiplier: 2.0,
);

// Calculate delay for a given attempt
final delay = retryQueue.nextDelay(2); // 4 seconds (1 * 2^2)

Sync Metadata #

await engine.sync(push: pushFn, pull: pullFn);

final meta = engine.metadata;
print('Last sync: ${meta.lastSyncAt}');
print('Duration: ${meta.lastDuration}');
print('Total pushes: ${meta.totalPushes}');
print('Total pulls: ${meta.totalPulls}');
print('Total conflicts: ${meta.totalConflicts}');

// Serialize for persistence
final json = meta.toJson();
final restored = SyncMetadata.fromJson(json);

Sync Hooks #

engine.onBeforeSync = (pendingRecords) {
  print('About to sync ${pendingRecords.length} records');
  return true; // return false to cancel
};

engine.onAfterSync = (result) {
  print('Synced: ${result.pushed} pushed, ${result.pulled} pulled');
};

engine.onConflict = (local, remote) {
  print('Conflict on ${local.id}');
};

Error Handling #

final result = await engine.sync(push: pushFn, pull: pullFn);

if (result.hasErrors) {
  for (final error in result.errors) {
    print('${error.recordId}: ${error.message}');
  }
}

Serialization #

// Serialize a record to JSON
final json = record.toJson();

// Restore from JSON
final restored = SyncRecord.fromJson(json);

// Serialize sync results and statistics
final resultJson = result.toJson();
final statsJson = store.statistics().toJson();

Querying Records #

final active = store.query(where: (r) => r.status == SyncStatus.synced);
final stats = store.statistics();
print('Total: ${stats.total}, Synced: ${stats.synced}');

API #

Class Method / Property Description
SyncRecord SyncRecord(id:, data:, updatedAt:) Create a sync record
SyncRecord withStatus(status) Copy with new status
SyncRecord withTags(tags) Copy with new tags
SyncRecord incrementVersion() Copy with version + 1
SyncRecord withData(data) Copy with new data payload
SyncRecord tags Tags for categorizing the record
SyncRecord toJson() Serialize to JSON map
SyncRecord SyncRecord.fromJson(map) Deserialize from JSON map
LocalStore put(record) Store a record
LocalStore putAll(records) Store multiple records
LocalStore get(id) Retrieve a record by id
LocalStore remove(id) Remove a record
LocalStore all() Get all records
LocalStore pending() Get pending and modified records
LocalStore markSynced(id) Mark record as synced
LocalStore markModified(id) Mark record as modified
LocalStore query(where:) Filter records with a predicate
LocalStore queryByTag(tag) Filter records by tag
LocalStore statistics() Get aggregate counts
LocalStore count Total record count
LocalStore clear() Remove all records
StoreStatistics toJson() Serialize to JSON map
StoreStatistics StoreStatistics.fromJson(map) Deserialize from JSON map
ConflictResolver resolve(local, remote) Resolve a conflict
ConflictResolver resolvedCount Number of conflicts resolved
RetryQueue enqueue(record) Add a record to retry
RetryQueue dequeueAll() Get and clear all retryable records
RetryQueue pending() View queued records
RetryQueue nextDelay(attempt) Calculate backoff delay for attempt
RetryQueue count Number of queued records
RetryQueue clear() Clear the queue
SyncEngine sync(push:, pull:, onProgress:) Run a sync cycle
SyncEngine syncWhere(predicate, push:, pull:, onProgress:) Selective sync by predicate
SyncEngine metadata Cumulative sync statistics
SyncEngine lastSyncResult Result of the last sync
SyncEngine isSyncing Whether a sync is in progress
SyncMetadata lastSyncAt, lastDuration Timing of last sync
SyncMetadata totalPushes, totalPulls, totalConflicts Cumulative counts
SyncMetadata copyWith(...) Copy with updated fields
SyncMetadata toJson() / fromJson(map) JSON serialization
SyncEngine onBeforeSync Hook called before sync starts
SyncEngine onAfterSync Hook called after sync completes
SyncEngine onConflict Hook called on conflict detection
SyncError recordId, message, timestamp Error details for a failed record
SyncError toJson() / SyncError.fromJson(map) JSON serialization
SyncResult pushed, pulled, conflicts, retried Individual counts
SyncResult errors List of errors from the sync cycle
SyncResult hasErrors Whether any errors occurred
SyncResult total Sum of all counts
SyncResult toJson() / SyncResult.fromJson(map) JSON serialization

Development #

dart pub get
dart analyze --fatal-infos
dart test

Support #

If you find this project useful:

Star the repo

🐛 Report issues

💡 Suggest features

❤️ Sponsor development

🌐 All Open Source Projects

💻 GitHub Profile

🔗 LinkedIn Profile

License #

MIT

2
likes
160
points
21
downloads

Documentation

API reference

Publisher

verified publisherphiliprehberger.com

Weekly Downloads

Offline-first data sync with conflict resolution, retry queues, and local caching

Homepage
Repository (GitHub)
View/report issues

Topics

#sync #offline #cache

License

MIT (license)

More

Packages that depend on philiprehberger_sync_engine