benchmarkIngestHeap static method

Future<IngestHeapBenchResult> benchmarkIngestHeap({
  1. int targetBytes = 4 * 1024 * 1024,
  2. int embeddingDim = 384,
  3. int maxChunkChars = 1500,
  4. int overlapChars = 0,
  5. int batchSize = 16,
  6. String? restoreDbPath,
  7. String? dbPathOverride,
})

Measure peak process RSS during each ingest entrypoint variant on the realistic caller pattern:

  • String path: caller does readAsBytesutf8.decodeaddDocument.
  • UTF-8 path: caller does readAsBytesaddDocumentUtf8.
  • File path: caller passes the path only → addDocumentFromFile.

Each variant runs in isolation with a fresh DB collection. A 5 ms RSS sampler runs in parallel and tracks the peak observed during the operation. Reports peak - baseline per variant.

Returns -1 deltas if ProcessInfo.currentRss is unsupported on the current platform.

Implementation

static Future<IngestHeapBenchResult> benchmarkIngestHeap({
  int targetBytes = 4 * 1024 * 1024,
  int embeddingDim = 384,
  int maxChunkChars = 1500,
  int overlapChars = 0,
  int batchSize = 16,
  String? restoreDbPath,
  String? dbPathOverride,
}) async {
  if (ProcessInfo.currentRss < 0) {
    throw StateError(
      'benchmarkIngestHeap: ProcessInfo.currentRss is unsupported on this platform',
    );
  }

  final content = _generateBenchDoc(targetBytes);
  final docUtf8Bytes = content.length;

  final benchDbPath = dbPathOverride ??
      "${(await getApplicationDocumentsDirectory()).path}/ingest_heap_bench.sqlite";
  final benchFile = File(benchDbPath);
  if (await benchFile.exists()) {
    await benchFile.delete();
  }
  final benchTextFile = File('$benchDbPath.txt');
  if (await benchTextFile.exists()) {
    await benchTextFile.delete();
  }
  await benchTextFile.writeAsString(content, flush: true);

  await initDbPool(dbPath: benchDbPath, maxSize: 4);
  await initDb();
  await source_rag.initSourceDb();

  final stubEmbedding = Float32List(embeddingDim);

  Future<int> drainAndDelete(
    ingest_session.PreparedIngestion prepared,
    String collectionId,
  ) async {
    final session = prepared.session;
    if (session == null) {
      throw StateError(
        'benchmarkIngestHeap: prepare returned no session '
        '(state=${prepared.state}); benchmark requires a fresh collection.',
      );
    }
    try {
      var saved = 0;
      while (saved < prepared.totalChunks) {
        final batch = await session.takeEmbeddingBatch(batchSize: batchSize);
        if (batch.isEmpty) break;
        final embeddings = batch
            .map(
              (req) => ingest_session.ChunkEmbedding(
                chunkIndex: req.chunkIndex,
                embedding: stubEmbedding,
              ),
            )
            .toList(growable: false);
        saved += await session.commitEmbeddings(embeddings: embeddings);
      }
      await session.finalize();
    } finally {
      await session.dispose();
    }
    await source_rag.deleteSourceInCollection(
      collectionId: collectionId,
      sourceId: prepared.sourceId,
    );
    return prepared.totalChunks;
  }

  Future<int> measureVariant(Future<void> Function() runVariant) async {
    // Two-phase: baseline (let any prior GC settle), then sample peak
    // during the variant. 5 ms cadence is fast enough to catch the
    // body-allocation spike for a multi-MB doc.
    await Future.delayed(const Duration(milliseconds: 50));
    final baseline = ProcessInfo.currentRss;
    var peak = baseline;
    final stopwatch = Stopwatch()..start();
    final sampler = Timer.periodic(const Duration(milliseconds: 5), (_) {
      final r = ProcessInfo.currentRss;
      if (r > peak) peak = r;
    });
    try {
      await runVariant();
    } finally {
      sampler.cancel();
      stopwatch.stop();
      final r = ProcessInfo.currentRss;
      if (r > peak) peak = r;
    }
    return peak - baseline;
  }

  final stringDelta = await measureVariant(() async {
    final bytes = await benchTextFile.readAsBytes();
    // Caller-side String materialization — this is what from_utf8 / from_file avoid.
    final s = utf8.decode(bytes);
    final prepared = await ingest_session.prepareSourceIngestion(
      collectionId: 'bench-heap-string',
      content: s,
      metadata: null,
      name: 'bench-heap-string',
      strategy: ingest_session.IngestStrategy.recursive,
      maxChars: maxChunkChars,
      overlapChars: overlapChars,
    );
    await drainAndDelete(prepared, 'bench-heap-string');
  });

  final utf8Delta = await measureVariant(() async {
    final bytes = await benchTextFile.readAsBytes();
    final prepared = await ingest_session.prepareSourceIngestionFromUtf8(
      collectionId: 'bench-heap-utf8',
      contentBytes: bytes,
      metadata: null,
      name: 'bench-heap-utf8',
      strategy: ingest_session.IngestStrategy.recursive,
      maxChars: maxChunkChars,
      overlapChars: overlapChars,
    );
    await drainAndDelete(prepared, 'bench-heap-utf8');
  });

  final fileDelta = await measureVariant(() async {
    final prepared = await ingest_session.prepareSourceIngestionFromFile(
      collectionId: 'bench-heap-file',
      filePath: benchTextFile.path,
      metadata: null,
      name: 'bench-heap-file',
      strategyHint: ingest_session.IngestStrategy.recursive,
      maxChars: maxChunkChars,
      overlapChars: overlapChars,
    );
    await drainAndDelete(prepared, 'bench-heap-file');
  });

  await closeDbPool();
  if (await benchFile.exists()) {
    await benchFile.delete();
  }
  if (await benchTextFile.exists()) {
    await benchTextFile.delete();
  }
  if (restoreDbPath != null) {
    await initDbPool(dbPath: restoreDbPath, maxSize: 4);
  }

  return IngestHeapBenchResult(
    docBytes: docUtf8Bytes,
    stringPathPeakDeltaBytes: stringDelta,
    utf8PathPeakDeltaBytes: utf8Delta,
    filePathPeakDeltaBytes: fileDelta,
  );
}