changeEvents property

Stream<List<SyncChange>> get changeEvents

Get a broadcast stream of incoming synced data changes.

Subscribe (listen) to the stream to actually start listening to events.

Implementation

Stream<List<SyncChange>> get changeEvents {
  if (_changeEvents == null) {
    // This stream combines events from two C listeners: connect & disconnect.
    _changeEvents = _SyncListenerGroup<List<SyncChange>>('sync-change');

    final entityTypesById = InternalStoreAccess.entityTypeById(_store);

    _changeEvents!.add(_SyncListenerConfig(
        (int nativePort) => C.dartc_sync_listener_change(_ptr, nativePort),
        (dynamic msg, controller) {
      if (msg is! List) {
        controller.addError(ObjectBoxException(
            'Received invalid data type from the core notification: (${msg.runtimeType}) $msg'));
        return;
      }

      final syncChanges = msg;

      // List<SyncChange> is flattened to List<dynamic>, with SyncChange object
      // properties always coming in groups of three (entityId, puts, removals)
      const numProperties = 3;
      if (syncChanges.length % numProperties != 0) {
        controller.addError(ObjectBoxException(
            'Received invalid list length from the core notification: (${syncChanges.runtimeType}) $syncChanges'));
        return;
      }

      final changes = <SyncChange>[];
      for (var i = 0; i < syncChanges.length / numProperties; i++) {
        final dynamic entityId = syncChanges[i * numProperties + 0];
        final dynamic putsBytes = syncChanges[i * numProperties + 1];
        final dynamic removalsBytes = syncChanges[i * numProperties + 2];

        final entityType = entityTypesById[entityId];
        if (entityType == null) {
          controller.addError(ObjectBoxException(
              'Received sync change notification for an unknown entity ID $entityId'));
          return;
        }

        if (entityId is! int ||
            putsBytes is! Uint8List ||
            removalsBytes is! Uint8List) {
          controller.addError(ObjectBoxException(
              'Received invalid list items format from the core notification at i=$i: '
              'entityId = (${entityId.runtimeType}) $entityId; '
              'putsBytes = (${putsBytes.runtimeType}) $putsBytes; '
              'removalsBytes = (${removalsBytes.runtimeType}) $removalsBytes'));
          return;
        }

        changes.add(SyncChange._(
            entityId,
            entityType,
            Uint64List.view(putsBytes.buffer).toList(),
            Uint64List.view(removalsBytes.buffer).toList()));
      }

      controller.add(changes);
    }));

    _changeEvents!.finish();
  }
  return _changeEvents!.stream;
}