bulkUpsert method

  1. @override
Future<List<DataRecord>> bulkUpsert(
  1. BulkUpsertRequest request
)
override

Implementation

@override
Future<List<DataRecord>> bulkUpsert(BulkUpsertRequest request) async {
  final results = <DataRecord>[];
  final writes = <DataRecord>[];
  final events = <MapEntry<DataChangeType, DataRecord>>[];

  final groupedByCollection = <String, List<DataRecord>>{};
  for (final record in request.records) {
    groupedByCollection
        .putIfAbsent(record.collection, () => <DataRecord>[])
        .add(record);
  }

  final existingByCollection = <String, Map<String, DataRecord>>{};
  for (final entry in groupedByCollection.entries) {
    final ids = entry.value.map((record) => record.id).toSet();
    if (ids.isEmpty) {
      existingByCollection[entry.key] = const <String, DataRecord>{};
      continue;
    }
    existingByCollection[entry.key] = await storage.readRecords(
      entry.key,
      ids,
    );
  }

  for (final incoming in request.records) {
    final existing = existingByCollection[incoming.collection]?[incoming.id];
    if (existing == null) {
      writes.add(incoming);
      events.add(MapEntry(DataChangeType.created, incoming));
      results.add(incoming);
      continue;
    }

    if (incoming.version <= existing.version) {
      throw RpcDataError.conflict(
        'Version ${incoming.version} is not newer than ${existing.version} for ${incoming.id}',
      );
    }

    final updated = DataRecord(
      id: existing.id,
      collection: existing.collection,
      payload: incoming.payload,
      version: incoming.version,
      createdAt: existing.createdAt,
      updatedAt: incoming.updatedAt,
    );
    writes.add(updated);
    events.add(MapEntry(DataChangeType.updated, updated));
    results.add(updated);
  }

  if (writes.isNotEmpty) {
    for (final record in writes) {
      await _validatePayload(record.collection, record.payload);
    }
    await storage.writeRecords(writes);
    for (final entry in events) {
      await _recordEvent(entry.key, entry.value);
    }
  }

  return results;
}