persist method

  1. @override
Future<void> persist(
  1. List<BroadcastDocument> docs
)
override

Implementation

@override
persist(docs) async {
  await isInitialized;

  for (final doc in docs) {
    final collection = doc.collection;
    final persistorSettings = doc.persistorSettings ?? this.persistorSettings;
    final indexId = _getIndexId(collection, doc.id);

    if (!persistorSettings.persistenceEnabled) {
      continue;
    }

    String? documentDataStoreShard;
    final FileDataStore documentDataStore;
    final String documentDataStoreFilename;

    if (persistorSettings is FilePersistorSettings &&
        persistorSettings.shardEnabled &&
        persistorSettings.maxShards > 1) {
      final maxShards = persistorSettings.maxShards;
      documentDataStoreShard = persistorSettings.getShard(doc);
      final documentDataStoreShardFilename = buildFileDataStoreFilename(
        collection: collection,
        shard: documentDataStoreShard,
        settings: persistorSettings,
      );

      if (!_fileDataStoreIndex.containsKey(documentDataStoreShardFilename)) {
        final collectionFileDataStores =
            _fileDataStoreIndex.values.where((fileDataStore) {
          return fileDataStore.collection == collection;
        }).toList();

        if (collectionFileDataStores.length >= maxShards) {
          // If the collection has reached its max shards already, then any additional documents
          // that do not hash to an existing shard are stored in one of them at random.
          final shardIndex = Random().nextInt(maxShards);
          documentDataStoreShard = collectionFileDataStores[shardIndex].shard;
        }
      }
    }

    documentDataStoreFilename = buildFileDataStoreFilename(
      collection: collection,
      shard: documentDataStoreShard,
      settings: persistorSettings,
    );

    if (_fileDataStoreIndex.containsKey(documentDataStoreFilename)) {
      documentDataStore = _fileDataStoreIndex[documentDataStoreFilename]!;
    } else {
      documentDataStore =
          _fileDataStoreIndex[documentDataStoreFilename] = buildFileDataStore(
        collection: collection,
        shard: documentDataStoreShard,
        settings: persistorSettings,
      );
    }

    // If the document has changed the file data store it is to be persisted in, it should be removed
    // from its previous data store.
    final prevDocumentDataStore = _documentFileDataStoreIndex[indexId];
    if (prevDocumentDataStore != null &&
        documentDataStore != prevDocumentDataStore) {
      prevDocumentDataStore.removeDocument(doc.id);
    }

    _documentFileDataStoreIndex[indexId] = documentDataStore;
    documentDataStore.updateDocument(doc);
  }

  final fileDataStoresToPersist = _fileDataStoreIndex.values
      .where((fileDataStore) => fileDataStore.shouldPersist)
      .toList();

  // If for some reason one or more writes fail, then that is still recoverable as the file data store collections
  // maintains in-memory the latest state of the world, so on next broadcast, any data stores that still
  // require writing will retry at that time.
  await Future.wait(
    fileDataStoresToPersist.map(
      (dataStore) => dataStore.persist(),
    ),
  );
}