benchmarkIngestHeap static method
Measure peak process RSS during each ingest entrypoint variant on the realistic caller pattern:
- String path: caller does
readAsBytes→utf8.decode→addDocument. - UTF-8 path: caller does
readAsBytes→addDocumentUtf8. - File path: caller passes the path only →
addDocumentFromFile.
Each variant runs in isolation with a fresh DB collection. A 5 ms RSS
sampler runs in parallel and tracks the peak observed during the
operation. Reports peak - baseline per variant.
Returns -1 deltas if ProcessInfo.currentRss is unsupported on the current platform.
Implementation
static Future<IngestHeapBenchResult> benchmarkIngestHeap({
int targetBytes = 4 * 1024 * 1024,
int embeddingDim = 384,
int maxChunkChars = 1500,
int overlapChars = 0,
int batchSize = 16,
String? restoreDbPath,
String? dbPathOverride,
}) async {
if (ProcessInfo.currentRss < 0) {
throw StateError(
'benchmarkIngestHeap: ProcessInfo.currentRss is unsupported on this platform',
);
}
final content = _generateBenchDoc(targetBytes);
final docUtf8Bytes = content.length;
final benchDbPath = dbPathOverride ??
"${(await getApplicationDocumentsDirectory()).path}/ingest_heap_bench.sqlite";
final benchFile = File(benchDbPath);
if (await benchFile.exists()) {
await benchFile.delete();
}
final benchTextFile = File('$benchDbPath.txt');
if (await benchTextFile.exists()) {
await benchTextFile.delete();
}
await benchTextFile.writeAsString(content, flush: true);
await initDbPool(dbPath: benchDbPath, maxSize: 4);
await initDb();
await source_rag.initSourceDb();
final stubEmbedding = Float32List(embeddingDim);
Future<int> drainAndDelete(
ingest_session.PreparedIngestion prepared,
String collectionId,
) async {
final session = prepared.session;
if (session == null) {
throw StateError(
'benchmarkIngestHeap: prepare returned no session '
'(state=${prepared.state}); benchmark requires a fresh collection.',
);
}
try {
var saved = 0;
while (saved < prepared.totalChunks) {
final batch = await session.takeEmbeddingBatch(batchSize: batchSize);
if (batch.isEmpty) break;
final embeddings = batch
.map(
(req) => ingest_session.ChunkEmbedding(
chunkIndex: req.chunkIndex,
embedding: stubEmbedding,
),
)
.toList(growable: false);
saved += await session.commitEmbeddings(embeddings: embeddings);
}
await session.finalize();
} finally {
await session.dispose();
}
await source_rag.deleteSourceInCollection(
collectionId: collectionId,
sourceId: prepared.sourceId,
);
return prepared.totalChunks;
}
Future<int> measureVariant(Future<void> Function() runVariant) async {
// Two-phase: baseline (let any prior GC settle), then sample peak
// during the variant. 5 ms cadence is fast enough to catch the
// body-allocation spike for a multi-MB doc.
await Future.delayed(const Duration(milliseconds: 50));
final baseline = ProcessInfo.currentRss;
var peak = baseline;
final stopwatch = Stopwatch()..start();
final sampler = Timer.periodic(const Duration(milliseconds: 5), (_) {
final r = ProcessInfo.currentRss;
if (r > peak) peak = r;
});
try {
await runVariant();
} finally {
sampler.cancel();
stopwatch.stop();
final r = ProcessInfo.currentRss;
if (r > peak) peak = r;
}
return peak - baseline;
}
final stringDelta = await measureVariant(() async {
final bytes = await benchTextFile.readAsBytes();
// Caller-side String materialization — this is what from_utf8 / from_file avoid.
final s = utf8.decode(bytes);
final prepared = await ingest_session.prepareSourceIngestion(
collectionId: 'bench-heap-string',
content: s,
metadata: null,
name: 'bench-heap-string',
strategy: ingest_session.IngestStrategy.recursive,
maxChars: maxChunkChars,
overlapChars: overlapChars,
);
await drainAndDelete(prepared, 'bench-heap-string');
});
final utf8Delta = await measureVariant(() async {
final bytes = await benchTextFile.readAsBytes();
final prepared = await ingest_session.prepareSourceIngestionFromUtf8(
collectionId: 'bench-heap-utf8',
contentBytes: bytes,
metadata: null,
name: 'bench-heap-utf8',
strategy: ingest_session.IngestStrategy.recursive,
maxChars: maxChunkChars,
overlapChars: overlapChars,
);
await drainAndDelete(prepared, 'bench-heap-utf8');
});
final fileDelta = await measureVariant(() async {
final prepared = await ingest_session.prepareSourceIngestionFromFile(
collectionId: 'bench-heap-file',
filePath: benchTextFile.path,
metadata: null,
name: 'bench-heap-file',
strategyHint: ingest_session.IngestStrategy.recursive,
maxChars: maxChunkChars,
overlapChars: overlapChars,
);
await drainAndDelete(prepared, 'bench-heap-file');
});
await closeDbPool();
if (await benchFile.exists()) {
await benchFile.delete();
}
if (await benchTextFile.exists()) {
await benchTextFile.delete();
}
if (restoreDbPath != null) {
await initDbPool(dbPath: restoreDbPath, maxSize: 4);
}
return IngestHeapBenchResult(
docBytes: docUtf8Bytes,
stringPathPeakDeltaBytes: stringDelta,
utf8PathPeakDeltaBytes: utf8Delta,
filePathPeakDeltaBytes: fileDelta,
);
}