benchmarkIngestFfiTraffic static method

Future<IngestFfiBenchResult> benchmarkIngestFfiTraffic({
  1. int targetBytes = 1 * 1024 * 1024,
  2. int embeddingDim = 384,
  3. int maxChunkChars = 1500,
  4. int overlapChars = 100,
  5. int batchSize = 16,
  6. String? restoreDbPath,
  7. String? dbPathOverride,
})

Measure FFI text-byte traffic for an addDocument ingest, comparing the legacy chain (addSourceInCollection + chunker + addChunks) against the IngestSession chain (prepareSourceIngestion + take_embedding_batch + commit_embeddings) on a deterministic document.

Self-contained: spins up an isolated temp DB, runs both pipelines with stub embeddings (zeroed Float32List of embeddingDim) to keep the measurement focused on FFI text traffic (no ONNX cost, no GC noise), and tears everything down before returning.

The caller is responsible for preserving whatever pool/DB state was active on entry — on success this function restores the original pool configuration; on failure the caller should reinitialise the pool.

Implementation

static Future<IngestFfiBenchResult> benchmarkIngestFfiTraffic({
  int targetBytes = 1 * 1024 * 1024,
  int embeddingDim = 384,
  int maxChunkChars = 1500,
  int overlapChars = 100,
  int batchSize = 16,
  String? restoreDbPath,
  String? dbPathOverride,
}) async {
  final content = _generateBenchDoc(targetBytes);
  // ASCII-only doc generator → 1 byte per code unit, so length == UTF-8
  // byte count. If we ever switch to multilingual content, use utf8.encode.
  final docUtf8Bytes = content.length;

  final benchDbPath = dbPathOverride ??
      "${(await getApplicationDocumentsDirectory()).path}/ingest_ffi_bench.sqlite";
  final benchFile = File(benchDbPath);
  if (await benchFile.exists()) {
    await benchFile.delete();
  }

  await initDbPool(dbPath: benchDbPath, maxSize: 4);
  await initDb();
  await source_rag.initSourceDb();

  final stubEmbedding = Float32List(embeddingDim);

  // ---- Legacy chain ----------------------------------------------------
  ingest_metrics.resetIngestTrafficStats();
  final legacyEntry = await source_rag.addSourceInCollection(
    collectionId: 'bench-legacy',
    content: content,
    metadata: null,
    name: 'bench-legacy',
  );
  final legacySourceId = legacyEntry.sourceId;
  await source_rag.claimSourceForIngestion(sourceId: legacySourceId);
  final legacyChunks = semantic_chunker.semanticChunkWithOverlap(
    text: content,
    maxChars: maxChunkChars,
    overlapChars: overlapChars,
  );
  final legacyChunkData = legacyChunks
      .map(
        (c) => source_rag.ChunkData(
          content: c.content,
          chunkIndex: c.index,
          startPos: c.startPos,
          endPos: c.endPos,
          chunkType: c.chunkType,
          embedding: stubEmbedding,
        ),
      )
      .toList(growable: false);
  for (var offset = 0; offset < legacyChunkData.length; offset += batchSize) {
    final end = math.min(offset + batchSize, legacyChunkData.length);
    await source_rag.addChunks(
      sourceId: legacySourceId,
      chunks: legacyChunkData.sublist(offset, end),
    );
  }
  final legacyStats = ingest_metrics.ingestTrafficStats();
  await source_rag.deleteSourceInCollection(
    collectionId: 'bench-legacy',
    sourceId: legacySourceId,
  );

  // ---- IngestSession chain --------------------------------------------
  ingest_metrics.resetIngestTrafficStats();
  final prepared = await ingest_session.prepareSourceIngestion(
    collectionId: 'bench-session',
    content: content,
    metadata: null,
    name: 'bench-session',
    strategy: ingest_session.IngestStrategy.recursive,
    maxChars: maxChunkChars,
    overlapChars: overlapChars,
  );
  final session = prepared.session;
  if (session == null) {
    throw StateError(
      'benchmarkIngestFfiTraffic: prepareSourceIngestion returned no session '
      '(state=${prepared.state}); benchmark requires a fresh DB.',
    );
  }
  try {
    var saved = 0;
    while (saved < prepared.totalChunks) {
      final batch = await session.takeEmbeddingBatch(batchSize: batchSize);
      if (batch.isEmpty) break;
      final embeddings = batch
          .map(
            (req) => ingest_session.ChunkEmbedding(
              chunkIndex: req.chunkIndex,
              embedding: stubEmbedding,
            ),
          )
          .toList(growable: false);
      saved += await session.commitEmbeddings(embeddings: embeddings);
    }
    await session.finalize();
  } finally {
    await session.dispose();
  }
  final sessionStats = ingest_metrics.ingestTrafficStats();
  await source_rag.deleteSourceInCollection(
    collectionId: 'bench-session',
    sourceId: prepared.sourceId,
  );

  // ---- Teardown --------------------------------------------------------
  await closeDbPool();
  if (await benchFile.exists()) {
    await benchFile.delete();
  }
  if (restoreDbPath != null) {
    await initDbPool(dbPath: restoreDbPath, maxSize: 4);
  }

  return IngestFfiBenchResult(
    docBytes: docUtf8Bytes,
    chunkCount: legacyChunks.length,
    legacy: legacyStats,
    session: sessionStats,
  );
}