benchmarkIngestLatency static method
Measure wall-clock latency of the full prepare → drain → finalize → delete cycle across the three ingest entrypoint variants. Stub embeddings (zeroed Float32List) keep the measurement focused on the FFI + Rust pipeline cost rather than ONNX inference time.
Each variant runs warmupRuns warmup iterations followed by
measuredRuns measured iterations. Reports p50 / p95 / mean / stdev
in milliseconds per variant.
Implementation
static Future<IngestLatencyBenchResult> benchmarkIngestLatency({
int targetBytes = 1 * 1024 * 1024,
int embeddingDim = 384,
int maxChunkChars = 1500,
int overlapChars = 0,
int batchSize = 16,
int warmupRuns = 2,
int measuredRuns = 5,
String? restoreDbPath,
String? dbPathOverride,
}) async {
final content = _generateBenchDoc(targetBytes);
final docUtf8Bytes = content.length;
final benchDbPath = dbPathOverride ??
"${(await getApplicationDocumentsDirectory()).path}/ingest_latency_bench.sqlite";
final benchFile = File(benchDbPath);
if (await benchFile.exists()) {
await benchFile.delete();
}
final benchTextFile = File('$benchDbPath.txt');
if (await benchTextFile.exists()) {
await benchTextFile.delete();
}
await benchTextFile.writeAsString(content, flush: true);
await initDbPool(dbPath: benchDbPath, maxSize: 4);
await initDb();
await source_rag.initSourceDb();
final stubEmbedding = Float32List(embeddingDim);
final contentBytes = Uint8List.fromList(content.codeUnits);
Future<double> runOnce(_LatencyVariant variant, int iteration) async {
final collectionId = 'bench-latency-${variant.name}-$iteration';
final sw = Stopwatch()..start();
late ingest_session.PreparedIngestion prepared;
switch (variant) {
case _LatencyVariant.string:
prepared = await ingest_session.prepareSourceIngestion(
collectionId: collectionId,
content: content,
metadata: null,
name: collectionId,
strategy: ingest_session.IngestStrategy.recursive,
maxChars: maxChunkChars,
overlapChars: overlapChars,
);
break;
case _LatencyVariant.utf8:
prepared = await ingest_session.prepareSourceIngestionFromUtf8(
collectionId: collectionId,
contentBytes: contentBytes,
metadata: null,
name: collectionId,
strategy: ingest_session.IngestStrategy.recursive,
maxChars: maxChunkChars,
overlapChars: overlapChars,
);
break;
case _LatencyVariant.file:
prepared = await ingest_session.prepareSourceIngestionFromFile(
collectionId: collectionId,
filePath: benchTextFile.path,
metadata: null,
name: collectionId,
strategyHint: ingest_session.IngestStrategy.recursive,
maxChars: maxChunkChars,
overlapChars: overlapChars,
);
break;
}
final session = prepared.session;
if (session == null) {
throw StateError(
'benchmarkIngestLatency: prepare returned no session for variant '
'${variant.name} (state=${prepared.state}).',
);
}
try {
var saved = 0;
while (saved < prepared.totalChunks) {
final batch = await session.takeEmbeddingBatch(batchSize: batchSize);
if (batch.isEmpty) break;
final embeddings = batch
.map(
(req) => ingest_session.ChunkEmbedding(
chunkIndex: req.chunkIndex,
embedding: stubEmbedding,
),
)
.toList(growable: false);
saved += await session.commitEmbeddings(embeddings: embeddings);
}
await session.finalize();
} finally {
await session.dispose();
}
sw.stop();
await source_rag.deleteSourceInCollection(
collectionId: collectionId,
sourceId: prepared.sourceId,
);
return sw.elapsedMicroseconds / 1000.0;
}
Future<IngestLatencyStats> measureVariant(_LatencyVariant variant) async {
for (var i = 0; i < warmupRuns; i++) {
await runOnce(variant, -1 - i);
}
final samples = <double>[];
for (var i = 0; i < measuredRuns; i++) {
samples.add(await runOnce(variant, i));
}
samples.sort();
final mean = samples.reduce((a, b) => a + b) / samples.length;
return IngestLatencyStats(
p50Ms: _percentileFromSorted(samples, 0.5),
p95Ms: _percentileFromSorted(samples, 0.95),
meanMs: mean,
stdevMs: _stdDev(samples, mean),
samples: List.unmodifiable(samples),
);
}
final stringStats = await measureVariant(_LatencyVariant.string);
final utf8Stats = await measureVariant(_LatencyVariant.utf8);
final fileStats = await measureVariant(_LatencyVariant.file);
await closeDbPool();
if (await benchFile.exists()) {
await benchFile.delete();
}
if (await benchTextFile.exists()) {
await benchTextFile.delete();
}
if (restoreDbPath != null) {
await initDbPool(dbPath: restoreDbPath, maxSize: 4);
}
return IngestLatencyBenchResult(
docBytes: docUtf8Bytes,
warmupRuns: warmupRuns,
measuredRuns: measuredRuns,
stringPath: stringStats,
utf8Path: utf8Stats,
filePath: fileStats,
);
}