changeEvents property
Stream<List<SyncChange> >
get
changeEvents
Get a broadcast stream of incoming synced data changes.
Subscribe (listen) to the stream to actually start listening to events.
Implementation
Stream<List<SyncChange>> get changeEvents {
if (_changeEvents == null) {
// This stream combines events from two C listeners: connect & disconnect.
_changeEvents = _SyncListenerGroup<List<SyncChange>>('sync-change');
final entityTypesById = InternalStoreAccess.entityTypeById(_store);
_changeEvents!.add(_SyncListenerConfig(
(int nativePort) => C.dartc_sync_listener_change(_ptr, nativePort),
(dynamic msg, controller) {
if (msg is! List) {
controller.addError(ObjectBoxException(
'Received invalid data type from the core notification: (${msg.runtimeType}) $msg'));
return;
}
final syncChanges = msg;
// List<SyncChange> is flattened to List<dynamic>, with SyncChange object
// properties always coming in groups of three (entityId, puts, removals)
const numProperties = 3;
if (syncChanges.length % numProperties != 0) {
controller.addError(ObjectBoxException(
'Received invalid list length from the core notification: (${syncChanges.runtimeType}) $syncChanges'));
return;
}
final changes = <SyncChange>[];
for (var i = 0; i < syncChanges.length / numProperties; i++) {
final dynamic entityId = syncChanges[i * numProperties + 0];
final dynamic putsBytes = syncChanges[i * numProperties + 1];
final dynamic removalsBytes = syncChanges[i * numProperties + 2];
final entityType = entityTypesById[entityId];
if (entityType == null) {
controller.addError(ObjectBoxException(
'Received sync change notification for an unknown entity ID $entityId'));
return;
}
if (entityId is! int ||
putsBytes is! Uint8List ||
removalsBytes is! Uint8List) {
controller.addError(ObjectBoxException(
'Received invalid list items format from the core notification at i=$i: '
'entityId = (${entityId.runtimeType}) $entityId; '
'putsBytes = (${putsBytes.runtimeType}) $putsBytes; '
'removalsBytes = (${removalsBytes.runtimeType}) $removalsBytes'));
return;
}
changes.add(SyncChange._(
entityId,
entityType,
Uint64List.view(putsBytes.buffer).toList(),
Uint64List.view(removalsBytes.buffer).toList()));
}
controller.add(changes);
}));
_changeEvents!.finish();
}
return _changeEvents!.stream;
}