persist method
Implementation
@override
persist(docs) async {
await isInitialized;
for (final doc in docs) {
final collection = doc.collection;
final persistorSettings = doc.persistorSettings ?? this.persistorSettings;
final indexId = _getIndexId(collection, doc.id);
if (!persistorSettings.persistenceEnabled) {
continue;
}
String? documentDataStoreShard;
final FileDataStore documentDataStore;
final String documentDataStoreFilename;
if (persistorSettings is FilePersistorSettings &&
persistorSettings.shardEnabled &&
persistorSettings.maxShards > 1) {
final maxShards = persistorSettings.maxShards;
documentDataStoreShard = persistorSettings.getShard(doc);
final documentDataStoreShardFilename = buildFileDataStoreFilename(
collection: collection,
shard: documentDataStoreShard,
settings: persistorSettings,
);
if (!_fileDataStoreIndex.containsKey(documentDataStoreShardFilename)) {
final collectionFileDataStores =
_fileDataStoreIndex.values.where((fileDataStore) {
return fileDataStore.collection == collection;
}).toList();
if (collectionFileDataStores.length >= maxShards) {
// If the collection has reached its max shards already, then any additional documents
// that do not hash to an existing shard are stored in one of them at random.
final shardIndex = Random().nextInt(maxShards);
documentDataStoreShard = collectionFileDataStores[shardIndex].shard;
}
}
}
documentDataStoreFilename = buildFileDataStoreFilename(
collection: collection,
shard: documentDataStoreShard,
settings: persistorSettings,
);
if (_fileDataStoreIndex.containsKey(documentDataStoreFilename)) {
documentDataStore = _fileDataStoreIndex[documentDataStoreFilename]!;
} else {
documentDataStore =
_fileDataStoreIndex[documentDataStoreFilename] = buildFileDataStore(
collection: collection,
shard: documentDataStoreShard,
settings: persistorSettings,
);
}
// If the document has changed the file data store it is to be persisted in, it should be removed
// from its previous data store.
final prevDocumentDataStore = _documentFileDataStoreIndex[indexId];
if (prevDocumentDataStore != null &&
documentDataStore != prevDocumentDataStore) {
prevDocumentDataStore.removeDocument(doc.id);
}
_documentFileDataStoreIndex[indexId] = documentDataStore;
documentDataStore.updateDocument(doc);
}
final fileDataStoresToPersist = _fileDataStoreIndex.values
.where((fileDataStore) => fileDataStore.shouldPersist)
.toList();
// If for some reason one or more writes fail, then that is still recoverable as the file data store collections
// maintains in-memory the latest state of the world, so on next broadcast, any data stores that still
// require writing will retry at that time.
await Future.wait(
fileDataStoresToPersist.map(
(dataStore) => dataStore.persist(),
),
);
}