start method

void start()

Start the sync engine and begin synchronizing all configured collections.

This method:

  1. Validates all collection configurations
  2. Creates sync helpers for each collection
  3. Hydrates the persistent outbox (restores pending changes)
  4. Queues initial sync for objects marked as needing sync
  5. Sets up Socket.IO event listeners for server updates
  6. Starts monitoring local Realm changes

Call this once after construction. Subsequent calls are ignored.

Example:

final realmSync = RealmSync(realm: realm, socket: socket, ...);
realmSync.start(); // Start syncing

// Now write data and it will sync automatically
realm.writeWithSync(message, () {
  message.text = "Hello!";
  message.syncUpdateDb = true;
});

Important: Ensure Socket.IO is connected before calling start(), or sync operations will queue until connection is established.

Implementation

void start() {
  if (_started) return;

  for (final cfg in configs) {
    // Comprehensive validation before starting sync
    SyncValidator.validateConfig(cfg);
    SyncValidator.validateSampleModels(cfg);

    // Build helper per collection
    final helper = SyncHelper(
      realm: realm,
      socket: socket,
      userId: userId,
      collectionName: cfg.collectionName,
      emitPreProcessor: cfg.emitPreProcessor,
      toSyncMapForId: (String id) {
        // Linear scan to find object by id (safe for all types)
        // Use dynamic invocation to bypass generic type constraints
        String idFromModel(dynamic model) {
          try {
            return Function.apply(cfg.idSelector as Function, [model]);
          } catch (e) {
            // Fallback: assume the model has an id property and convert to string
            final id = (model as dynamic).id;
            return id is String ? id : id.toString();
          }
        }

        for (final m in cfg.results) {
          if (idFromModel(m) == id) {
            // Prefer user-provided mapper
            if (cfg.toSyncMap != null) {
              return cfg.toSyncMap!(m as dynamic);
            }

            // RealmJson now automatically tries toEJson() first, then falls back
            final result = RealmJson.toJsonWith(
              m,
              cfg.propertyNames,
              embeddedProperties: cfg.embeddedProperties,
            );
            AppLogger.log(
              'toSyncMapForId($id) returning: ${result.keys.toList()}',
            );
            return result;
          }
        }
        return null;
      },
      sanitize:
          cfg.sanitize ??
          (Map<String, dynamic> src) {
            final out = Map<String, dynamic>.from(src);
            out.remove('sync_update_db');
            return out;
          },
      onAckSuccess: (String id) {
        realm.write(() {
          for (final m in cfg.results) {
            // Use Function.apply to bypass type checking
            final objId = Function.apply(cfg.idSelector as Function, [m]);
            if (objId == id) {
              // Call user-provided callback first if present
              cfg.applyAckSuccess?.call(m as dynamic);
              // Automatically clear sync_update_db flag if it exists
              _clearSyncFlag(m);
              break;
            }
          }
        });
      },
      onNoDiff: (String id) {
        realm.write(() {
          for (final m in cfg.results) {
            // Use Function.apply to bypass type checking
            final objId = Function.apply(cfg.idSelector as Function, [m]);
            if (objId == id) {
              // Call user-provided callback first if present
              cfg.applyNoDiff?.call(m as dynamic);
              // Automatically clear sync_update_db flag if it exists
              _clearSyncFlag(m);
              break;
            }
          }
        });
      },
    );

    _helpers[cfg.collectionName] = helper;

    // Outbox hydration + initial flush
    helper.initOutbox().then((_) => helper.flushAllPending());

    // Queue full sync for existing items needing sync
    for (final m in cfg.results) {
      try {
        // Use Function.apply to bypass type checking
        final needsSync =
            Function.apply(cfg.needsSync as Function, [m]) as bool;

        if (needsSync) {
          final id = Function.apply(cfg.idSelector as Function, [m]);
          helper.scheduleFullSync(id);
        }
      } catch (e) {
        AppLogger.log('${cfg.collectionName} Error queueing sync: $e');
      }
    }

    // Listen for changes in this collection
    final sub = cfg.results.changes.listen(
      (changes) {
        AppLogger.log(
          '${cfg.collectionName} results.changes fired: ${changes.results.length} items, _isApplyingServerUpdate=$_isApplyingServerUpdate',
        );
        // Skip processing if we're applying server updates
        // This prevents unnecessary computeDiff and cancelForId calls
        if (_isApplyingServerUpdate) return;

        // Note: Due to Dart's type erasure with List<SyncCollectionConfig>,
        // we need to be careful with type conversions here
        for (final m in changes.results) {
          try {
            // Use Function.apply to bypass type checking
            final needsSync =
                Function.apply(cfg.needsSync as Function, [m]) as bool;

            if (needsSync) {
              final id = Function.apply(cfg.idSelector as Function, [m]);

              // Note: sync_updated_at should be set by user in their write() block
              // We can't update it here as we're in a change notification callback

              AppLogger.log(
                '${cfg.collectionName} needsSync=true for $id, calling computeAndScheduleDiff',
              );
              helper.computeAndScheduleDiff(id, cfg.collectionName);
              _controller.add(SyncObjectEvent(cfg.collectionName, id, m));
              AppLogger.log('${cfg.collectionName} queued partial sync $id');
            } else {
              AppLogger.log(
                '${cfg.collectionName} needsSync=false for item, skipping',
              );
            }
          } catch (e) {
            AppLogger.log(
              '${cfg.collectionName} Error processing change: $e',
            );
          }
        }
      },
      onError: (error) {
        AppLogger.log(
          '${cfg.collectionName} Sync Error: $error',
          error: error,
          isError: true,
        );
      },
      onDone: () {
        AppLogger.log('${cfg.collectionName} Sync Done');
      },
    );
    _subscriptions[cfg.collectionName] = sub;
  }

  // Inbound: bootstrap initial data
  socket.on('sync:bootstrap', (payload) {
    try {
      final collection = payload['collection'] as String?;
      final data = payload['data'] as List<dynamic>?;
      if (collection == null || data == null) return;
      final cfg = configs.firstWhere(
        (c) => c.collectionName == collection,
        orElse: () => throw StateError('Unknown collection: $collection'),
      );
      final helper = _helpers[collection];
      if (helper == null) return;

      // Set flag to prevent change listeners from triggering syncs
      _isApplyingServerUpdate = true;
      try {
        // Apply all bootstrap objects in a single transaction; defer timestamp persistence
        realm.write(() {
          for (final d in data) {
            final map = Map<String, dynamic>.from(d as Map);
            final id = (map['_id'] ?? map['id']).toString();
            // Ensure UTC updated marker
            final ts =
                map['sync_updated_at'] is int
                    ? (map['sync_updated_at'] as int)
                    : DateTime.now().toUtc().millisecondsSinceEpoch;
            map['_id'] = id;
            map['sync_updated_at'] = ts;
              // Use config-captured decoder (preserves generic T)
              final RealmObject obj = cfg.decode!(map);
            realm.add(obj, update: true);
            helper.cancelForId(id);
            helper.setBaselineFromModel(id);
            final newTs = _max(_lastRemoteTsByCollection[collection], ts);
            _lastRemoteTsByCollection[collection] = newTs;
          }
        });
        // Persist latest timestamp AFTER write transaction
        final latest = _lastRemoteTsByCollection[collection];
        if (latest != null) {
          _persistTimestamp(collection, latest);
        }
      } finally {
        _isApplyingServerUpdate = false;
      }
    } catch (e) {
      AppLogger.log('bootstrap apply error: $e', error: e, isError: true);
    }
  });

  // Inbound: apply server changes with conflict management (UTC millis)
  socket.on('sync:changes', (raw) {
    try {
      final list = (raw as List).cast<Map>();

      // Set flag to prevent change listeners from triggering syncs
      _isApplyingServerUpdate = true;
      try {
        final Set<String> touchedCollections = <String>{};
        realm.write(() {
          for (final c in list) {
            final collection = c['collection']?.toString();
            final id = c['documentId']?.toString();
            final op = c['operation']?.toString();
            final ts =
                c['timestamp'] is int
                    ? (c['timestamp'] as int)
                    : DateTime.now().toUtc().millisecondsSinceEpoch;
            if (collection == null || id == null || op == null) continue;
            final cfg = configs.firstWhere(
              (cfg) => cfg.collectionName == collection,
              orElse:
                  () => throw StateError('Unknown collection: $collection'),
            );
            final helper = _helpers[collection];
            if (helper == null) continue;

            // Conflict: skip if local newer/equal
            // Find existing object by scanning the results (type-safe approach)
            RealmObject? existing;
            for (final obj in cfg.results) {
              // Use dynamic invocation to bypass type checking
              final objId = Function.apply(cfg.idSelector as Function, [obj]);
              if (objId == id) {
                existing = obj;
                break;
              }
            }
            final localTs = _readUpdatedAt(existing);
            if (localTs != null && localTs >= ts) {
              final newTs = _max(_lastRemoteTsByCollection[collection], ts);
              _lastRemoteTsByCollection[collection] = newTs;
              touchedCollections.add(collection);
              continue;
            }

            if (op == 'delete') {
              if (existing != null) {
                realm.delete(existing);
              }
              helper.cancelForId(id);
            } else {
              final data = Map<String, dynamic>.from(
                (c['data'] as Map?) ?? {},
              );
              data['_id'] = id;
              data['sync_updated_at'] = ts; // UTC millis
              final RealmObject obj = cfg.decode!(data);
              realm.add(obj, update: true);
              helper.cancelForId(id);
              helper.setBaselineFromModel(id);
            }

            final newTs = _max(_lastRemoteTsByCollection[collection], ts);
            _lastRemoteTsByCollection[collection] = newTs;
            touchedCollections.add(collection);
          }
        });
        // Persist timestamps outside the write transaction
        for (final col in touchedCollections) {
          final latest = _lastRemoteTsByCollection[col];
          if (latest != null) {
            _persistTimestamp(col, latest);
          }
        }
      } finally {
        _isApplyingServerUpdate = false;
      }
    } catch (e) {
      AppLogger.log('sync:changes apply error: $e', error: e, isError: true);
    }
  });

  // Optionally on reconnect: request missed changes since lastRemoteTs
  socket.on('connect', (_) {
    for (final cfg in configs) {
      final since = _lastRemoteTsByCollection[cfg.collectionName] ?? 0;
      final sub = _subscriptionsByCollection[cfg.collectionName];
      socket.emitWithAck(
        'sync:get_changes',
        {
          'userId': userId, // pass authenticated userId for server scoping
          'collection': cfg.collectionName,
          'since': since,
          'limit': 500,
          if (sub != null) 'filter': sub['filter'],
          if (sub != null) 'args': sub['args'],
        },
        ack: (resp) {
          try {
            final map = (resp as Map?) ?? const {};
            final changes = (map['changes'] as List?) ?? const [];
            final latest = map['latestTimestamp'] as int?;
            if (latest != null) {
              final newTs = _max(
                _lastRemoteTsByCollection[cfg.collectionName],
                latest,
              );
              _lastRemoteTsByCollection[cfg.collectionName] = newTs;
            }
            // Apply changes locally without re-emitting
            final Set<String> touchedCollections = <String>{};
            realm.write(() {
              for (final c in changes.cast<Map>()) {
                final collection = c['collection']?.toString();
                final id = c['documentId']?.toString();
                final op = c['operation']?.toString();
                final ts =
                    c['timestamp'] is int
                        ? (c['timestamp'] as int)
                        : DateTime.now().toUtc().millisecondsSinceEpoch;
                if (collection == null || id == null || op == null) continue;
                final cfg2 = configs.firstWhere(
                  (cfgx) => cfgx.collectionName == collection,
                  orElse:
                      () =>
                          throw StateError('Unknown collection: $collection'),
                );
                final helper2 = _helpers[collection];
                if (helper2 == null) continue;

                // Find existing object by scanning the results (type-safe approach)
                RealmObject? existing;
                for (final obj in cfg2.results) {
                  // Use dynamic invocation to bypass type checking
                  final objId = Function.apply(cfg2.idSelector as Function, [
                    obj,
                  ]);
                  if (objId == id) {
                    existing = obj;
                    break;
                  }
                }
                final localTs = _readUpdatedAt(existing);
                if (localTs != null && localTs >= ts) {
                  final newTs = _max(
                    _lastRemoteTsByCollection[collection],
                    ts,
                  );
                  _lastRemoteTsByCollection[collection] = newTs;
                  touchedCollections.add(collection);
                  continue;
                }

                if (op == 'delete') {
                  if (existing != null) {
                    realm.delete(existing);
                  }
                  helper2.cancelForId(id);
                } else {
                  final data = Map<String, dynamic>.from(
                    (c['data'] as Map?) ?? {},
                  );
                  data['_id'] = id;
                  data['sync_updated_at'] = ts; // UTC millis
                  final RealmObject obj = cfg2.decode!(data);
                  realm.add(obj, update: true);
                  helper2.cancelForId(id);
                  helper2.setBaselineFromModel(id);
                }

                final newTs = _max(_lastRemoteTsByCollection[collection], ts);
                _lastRemoteTsByCollection[collection] = newTs;
                touchedCollections.add(collection);
              }
            });
            // Persist timestamps after write
            for (final col in touchedCollections) {
              final ts = _lastRemoteTsByCollection[col];
              if (ts != null) {
                _persistTimestamp(col, ts);
              }
            }
          } catch (_) {}
        },
      );
    }
  });

  _started = true;
}