bulkUpsert method
Implementation
@override
Future<List<DataRecord>> bulkUpsert(BulkUpsertRequest request) async {
final results = <DataRecord>[];
final writes = <DataRecord>[];
final events = <MapEntry<DataChangeType, DataRecord>>[];
final groupedByCollection = <String, List<DataRecord>>{};
for (final record in request.records) {
groupedByCollection
.putIfAbsent(record.collection, () => <DataRecord>[])
.add(record);
}
final existingByCollection = <String, Map<String, DataRecord>>{};
for (final entry in groupedByCollection.entries) {
final ids = entry.value.map((record) => record.id).toSet();
if (ids.isEmpty) {
existingByCollection[entry.key] = const <String, DataRecord>{};
continue;
}
existingByCollection[entry.key] = await storage.readRecords(
entry.key,
ids,
);
}
for (final incoming in request.records) {
final existing = existingByCollection[incoming.collection]?[incoming.id];
if (existing == null) {
writes.add(incoming);
events.add(MapEntry(DataChangeType.created, incoming));
results.add(incoming);
continue;
}
if (incoming.version <= existing.version) {
throw RpcDataError.conflict(
'Version ${incoming.version} is not newer than ${existing.version} for ${incoming.id}',
);
}
final updated = DataRecord(
id: existing.id,
collection: existing.collection,
payload: incoming.payload,
version: incoming.version,
createdAt: existing.createdAt,
updatedAt: incoming.updatedAt,
);
writes.add(updated);
events.add(MapEntry(DataChangeType.updated, updated));
results.add(updated);
}
if (writes.isNotEmpty) {
for (final record in writes) {
await _validatePayload(record.collection, record.payload);
}
await storage.writeRecords(writes);
for (final entry in events) {
await _recordEvent(entry.key, entry.value);
}
}
return results;
}