changeEvents property

Stream<List<SyncChange>> get changeEvents

A broadcast stream of incoming synced data changes.

Each event contains a list of SyncChange objects, one per affected entity type, with the IDs of objects that were put or removed.

Subscribe (listen) to the stream to start receiving events. Cancel the subscription when no longer needed to free resources.

Implementation

Stream<List<SyncChange>> get changeEvents {
  if (_changeEvents == null) {
    // This stream combines events from two C listeners: connect & disconnect.
    _changeEvents = _SyncListenerGroup<List<SyncChange>>('sync-change');

    final entityTypesById = InternalStoreAccess.entityTypeById(_store);

    _changeEvents!.add(_SyncListenerConfig(
        (int nativePort) => C.dartc_sync_listener_change(_ptr, nativePort),
        (dynamic msg, controller) {
      if (msg is! List) {
        controller.addError(ObjectBoxException(
            'Received invalid data type from the core notification: (${msg.runtimeType}) $msg'));
        return;
      }

      final syncChanges = msg;

      // List<SyncChange> is flattened to List<dynamic>, with SyncChange object
      // properties always coming in groups of three (entityId, puts, removals)
      const numProperties = 3;
      if (syncChanges.length % numProperties != 0) {
        controller.addError(ObjectBoxException(
            'Received invalid list length from the core notification: (${syncChanges.runtimeType}) $syncChanges'));
        return;
      }

      final changes = <SyncChange>[];
      for (var i = 0; i < syncChanges.length / numProperties; i++) {
        final dynamic entityId = syncChanges[i * numProperties + 0];
        final dynamic putsBytes = syncChanges[i * numProperties + 1];
        final dynamic removalsBytes = syncChanges[i * numProperties + 2];

        final entityType = entityTypesById[entityId];
        if (entityType == null) {
          controller.addError(ObjectBoxException(
              'Received sync change notification for an unknown entity ID $entityId'));
          return;
        }

        if (entityId is! int ||
            putsBytes is! Uint8List ||
            removalsBytes is! Uint8List) {
          controller.addError(ObjectBoxException(
              'Received invalid list items format from the core notification at i=$i: '
              'entityId = (${entityId.runtimeType}) $entityId; '
              'putsBytes = (${putsBytes.runtimeType}) $putsBytes; '
              'removalsBytes = (${removalsBytes.runtimeType}) $removalsBytes'));
          return;
        }

        changes.add(SyncChange._(
            entityId,
            entityType,
            Uint64List.view(putsBytes.buffer).toList(),
            Uint64List.view(removalsBytes.buffer).toList()));
      }

      controller.add(changes);
    }));

    _changeEvents!.finish();
  }
  return _changeEvents!.stream;
}