start method
void
start()
Start the sync engine and begin synchronizing all configured collections.
This method:
- Validates all collection configurations
- Creates sync helpers for each collection
- Hydrates the persistent outbox (restores pending changes)
- Queues initial sync for objects marked as needing sync
- Sets up Socket.IO event listeners for server updates
- Starts monitoring local Realm changes
Call this once after construction. Subsequent calls are ignored.
Example:
final realmSync = RealmSync(realm: realm, socket: socket, ...);
realmSync.start(); // Start syncing
// Now write data and it will sync automatically
realm.writeWithSync(message, () {
message.text = "Hello!";
message.syncUpdateDb = true;
});
Important: Ensure Socket.IO is connected before calling start(),
or sync operations will queue until connection is established.
Implementation
void start() {
if (_started) return;
for (final cfg in configs) {
// Comprehensive validation before starting sync
SyncValidator.validateConfig(cfg);
SyncValidator.validateSampleModels(cfg);
// Build helper per collection
final helper = SyncHelper(
realm: realm,
socket: socket,
userId: userId,
collectionName: cfg.collectionName,
emitPreProcessor: cfg.emitPreProcessor,
toSyncMapForId: (String id) {
// Linear scan to find object by id (safe for all types)
// Use dynamic invocation to bypass generic type constraints
String idFromModel(dynamic model) {
try {
return Function.apply(cfg.idSelector as Function, [model]);
} catch (e) {
// Fallback: assume the model has an id property and convert to string
final id = (model as dynamic).id;
return id is String ? id : id.toString();
}
}
for (final m in cfg.results) {
if (idFromModel(m) == id) {
// Prefer user-provided mapper
if (cfg.toSyncMap != null) {
return cfg.toSyncMap!(m as dynamic);
}
// RealmJson now automatically tries toEJson() first, then falls back
final result = RealmJson.toJsonWith(
m,
cfg.propertyNames,
embeddedProperties: cfg.embeddedProperties,
);
AppLogger.log(
'toSyncMapForId($id) returning: ${result.keys.toList()}',
);
return result;
}
}
return null;
},
sanitize:
cfg.sanitize ??
(Map<String, dynamic> src) {
final out = Map<String, dynamic>.from(src);
out.remove('sync_update_db');
return out;
},
onAckSuccess: (String id) {
realm.write(() {
for (final m in cfg.results) {
// Use Function.apply to bypass type checking
final objId = Function.apply(cfg.idSelector as Function, [m]);
if (objId == id) {
// Call user-provided callback first if present
cfg.applyAckSuccess?.call(m as dynamic);
// Automatically clear sync_update_db flag if it exists
_clearSyncFlag(m);
break;
}
}
});
},
onNoDiff: (String id) {
realm.write(() {
for (final m in cfg.results) {
// Use Function.apply to bypass type checking
final objId = Function.apply(cfg.idSelector as Function, [m]);
if (objId == id) {
// Call user-provided callback first if present
cfg.applyNoDiff?.call(m as dynamic);
// Automatically clear sync_update_db flag if it exists
_clearSyncFlag(m);
break;
}
}
});
},
);
_helpers[cfg.collectionName] = helper;
// Outbox hydration + initial flush
helper.initOutbox().then((_) => helper.flushAllPending());
// Queue full sync for existing items needing sync
for (final m in cfg.results) {
try {
// Use Function.apply to bypass type checking
final needsSync =
Function.apply(cfg.needsSync as Function, [m]) as bool;
if (needsSync) {
final id = Function.apply(cfg.idSelector as Function, [m]);
helper.scheduleFullSync(id);
}
} catch (e) {
AppLogger.log('${cfg.collectionName} Error queueing sync: $e');
}
}
// Listen for changes in this collection
final sub = cfg.results.changes.listen(
(changes) {
AppLogger.log(
'${cfg.collectionName} results.changes fired: ${changes.results.length} items, _isApplyingServerUpdate=$_isApplyingServerUpdate',
);
// Skip processing if we're applying server updates
// This prevents unnecessary computeDiff and cancelForId calls
if (_isApplyingServerUpdate) return;
// Note: Due to Dart's type erasure with List<SyncCollectionConfig>,
// we need to be careful with type conversions here
for (final m in changes.results) {
try {
// Use Function.apply to bypass type checking
final needsSync =
Function.apply(cfg.needsSync as Function, [m]) as bool;
if (needsSync) {
final id = Function.apply(cfg.idSelector as Function, [m]);
// Note: sync_updated_at should be set by user in their write() block
// We can't update it here as we're in a change notification callback
AppLogger.log(
'${cfg.collectionName} needsSync=true for $id, calling computeAndScheduleDiff',
);
helper.computeAndScheduleDiff(id, cfg.collectionName);
_controller.add(SyncObjectEvent(cfg.collectionName, id, m));
AppLogger.log('${cfg.collectionName} queued partial sync $id');
} else {
AppLogger.log(
'${cfg.collectionName} needsSync=false for item, skipping',
);
}
} catch (e) {
AppLogger.log(
'${cfg.collectionName} Error processing change: $e',
);
}
}
},
onError: (error) {
AppLogger.log(
'${cfg.collectionName} Sync Error: $error',
error: error,
isError: true,
);
},
onDone: () {
AppLogger.log('${cfg.collectionName} Sync Done');
},
);
_subscriptions[cfg.collectionName] = sub;
}
// Inbound: bootstrap initial data
socket.on('sync:bootstrap', (payload) {
try {
final collection = payload['collection'] as String?;
final data = payload['data'] as List<dynamic>?;
if (collection == null || data == null) return;
final cfg = configs.firstWhere(
(c) => c.collectionName == collection,
orElse: () => throw StateError('Unknown collection: $collection'),
);
final helper = _helpers[collection];
if (helper == null) return;
// Set flag to prevent change listeners from triggering syncs
_isApplyingServerUpdate = true;
try {
// Apply all bootstrap objects in a single transaction; defer timestamp persistence
realm.write(() {
for (final d in data) {
final map = Map<String, dynamic>.from(d as Map);
final id = (map['_id'] ?? map['id']).toString();
// Ensure UTC updated marker
final ts =
map['sync_updated_at'] is int
? (map['sync_updated_at'] as int)
: DateTime.now().toUtc().millisecondsSinceEpoch;
map['_id'] = id;
map['sync_updated_at'] = ts;
// Use config-captured decoder (preserves generic T)
final RealmObject obj = cfg.decode!(map);
realm.add(obj, update: true);
helper.cancelForId(id);
helper.setBaselineFromModel(id);
final newTs = _max(_lastRemoteTsByCollection[collection], ts);
_lastRemoteTsByCollection[collection] = newTs;
}
});
// Persist latest timestamp AFTER write transaction
final latest = _lastRemoteTsByCollection[collection];
if (latest != null) {
_persistTimestamp(collection, latest);
}
} finally {
_isApplyingServerUpdate = false;
}
} catch (e) {
AppLogger.log('bootstrap apply error: $e', error: e, isError: true);
}
});
// Inbound: apply server changes with conflict management (UTC millis)
socket.on('sync:changes', (raw) {
try {
final list = (raw as List).cast<Map>();
// Set flag to prevent change listeners from triggering syncs
_isApplyingServerUpdate = true;
try {
final Set<String> touchedCollections = <String>{};
realm.write(() {
for (final c in list) {
final collection = c['collection']?.toString();
final id = c['documentId']?.toString();
final op = c['operation']?.toString();
final ts =
c['timestamp'] is int
? (c['timestamp'] as int)
: DateTime.now().toUtc().millisecondsSinceEpoch;
if (collection == null || id == null || op == null) continue;
final cfg = configs.firstWhere(
(cfg) => cfg.collectionName == collection,
orElse:
() => throw StateError('Unknown collection: $collection'),
);
final helper = _helpers[collection];
if (helper == null) continue;
// Conflict: skip if local newer/equal
// Find existing object by scanning the results (type-safe approach)
RealmObject? existing;
for (final obj in cfg.results) {
// Use dynamic invocation to bypass type checking
final objId = Function.apply(cfg.idSelector as Function, [obj]);
if (objId == id) {
existing = obj;
break;
}
}
final localTs = _readUpdatedAt(existing);
if (localTs != null && localTs >= ts) {
final newTs = _max(_lastRemoteTsByCollection[collection], ts);
_lastRemoteTsByCollection[collection] = newTs;
touchedCollections.add(collection);
continue;
}
if (op == 'delete') {
if (existing != null) {
realm.delete(existing);
}
helper.cancelForId(id);
} else {
final data = Map<String, dynamic>.from(
(c['data'] as Map?) ?? {},
);
data['_id'] = id;
data['sync_updated_at'] = ts; // UTC millis
final RealmObject obj = cfg.decode!(data);
realm.add(obj, update: true);
helper.cancelForId(id);
helper.setBaselineFromModel(id);
}
final newTs = _max(_lastRemoteTsByCollection[collection], ts);
_lastRemoteTsByCollection[collection] = newTs;
touchedCollections.add(collection);
}
});
// Persist timestamps outside the write transaction
for (final col in touchedCollections) {
final latest = _lastRemoteTsByCollection[col];
if (latest != null) {
_persistTimestamp(col, latest);
}
}
} finally {
_isApplyingServerUpdate = false;
}
} catch (e) {
AppLogger.log('sync:changes apply error: $e', error: e, isError: true);
}
});
// Optionally on reconnect: request missed changes since lastRemoteTs
socket.on('connect', (_) {
for (final cfg in configs) {
final since = _lastRemoteTsByCollection[cfg.collectionName] ?? 0;
final sub = _subscriptionsByCollection[cfg.collectionName];
socket.emitWithAck(
'sync:get_changes',
{
'userId': userId, // pass authenticated userId for server scoping
'collection': cfg.collectionName,
'since': since,
'limit': 500,
if (sub != null) 'filter': sub['filter'],
if (sub != null) 'args': sub['args'],
},
ack: (resp) {
try {
final map = (resp as Map?) ?? const {};
final changes = (map['changes'] as List?) ?? const [];
final latest = map['latestTimestamp'] as int?;
if (latest != null) {
final newTs = _max(
_lastRemoteTsByCollection[cfg.collectionName],
latest,
);
_lastRemoteTsByCollection[cfg.collectionName] = newTs;
}
// Apply changes locally without re-emitting
final Set<String> touchedCollections = <String>{};
realm.write(() {
for (final c in changes.cast<Map>()) {
final collection = c['collection']?.toString();
final id = c['documentId']?.toString();
final op = c['operation']?.toString();
final ts =
c['timestamp'] is int
? (c['timestamp'] as int)
: DateTime.now().toUtc().millisecondsSinceEpoch;
if (collection == null || id == null || op == null) continue;
final cfg2 = configs.firstWhere(
(cfgx) => cfgx.collectionName == collection,
orElse:
() =>
throw StateError('Unknown collection: $collection'),
);
final helper2 = _helpers[collection];
if (helper2 == null) continue;
// Find existing object by scanning the results (type-safe approach)
RealmObject? existing;
for (final obj in cfg2.results) {
// Use dynamic invocation to bypass type checking
final objId = Function.apply(cfg2.idSelector as Function, [
obj,
]);
if (objId == id) {
existing = obj;
break;
}
}
final localTs = _readUpdatedAt(existing);
if (localTs != null && localTs >= ts) {
final newTs = _max(
_lastRemoteTsByCollection[collection],
ts,
);
_lastRemoteTsByCollection[collection] = newTs;
touchedCollections.add(collection);
continue;
}
if (op == 'delete') {
if (existing != null) {
realm.delete(existing);
}
helper2.cancelForId(id);
} else {
final data = Map<String, dynamic>.from(
(c['data'] as Map?) ?? {},
);
data['_id'] = id;
data['sync_updated_at'] = ts; // UTC millis
final RealmObject obj = cfg2.decode!(data);
realm.add(obj, update: true);
helper2.cancelForId(id);
helper2.setBaselineFromModel(id);
}
final newTs = _max(_lastRemoteTsByCollection[collection], ts);
_lastRemoteTsByCollection[collection] = newTs;
touchedCollections.add(collection);
}
});
// Persist timestamps after write
for (final col in touchedCollections) {
final ts = _lastRemoteTsByCollection[col];
if (ts != null) {
_persistTimestamp(col, ts);
}
}
} catch (_) {}
},
);
}
});
_started = true;
}