benchmarkIngestLatency static method

Future<IngestLatencyBenchResult> benchmarkIngestLatency({
  1. int targetBytes = 1 * 1024 * 1024,
  2. int embeddingDim = 384,
  3. int maxChunkChars = 1500,
  4. int overlapChars = 0,
  5. int batchSize = 16,
  6. int warmupRuns = 2,
  7. int measuredRuns = 5,
  8. String? restoreDbPath,
  9. String? dbPathOverride,
})

Measure wall-clock latency of the full prepare → drain → finalize → delete cycle across the three ingest entrypoint variants. Stub embeddings (zeroed Float32List) keep the measurement focused on the FFI + Rust pipeline cost rather than ONNX inference time.

Each variant runs warmupRuns warmup iterations followed by measuredRuns measured iterations. Reports p50 / p95 / mean / stdev in milliseconds per variant.

Implementation

static Future<IngestLatencyBenchResult> benchmarkIngestLatency({
  int targetBytes = 1 * 1024 * 1024,
  int embeddingDim = 384,
  int maxChunkChars = 1500,
  int overlapChars = 0,
  int batchSize = 16,
  int warmupRuns = 2,
  int measuredRuns = 5,
  String? restoreDbPath,
  String? dbPathOverride,
}) async {
  final content = _generateBenchDoc(targetBytes);
  final docUtf8Bytes = content.length;

  final benchDbPath = dbPathOverride ??
      "${(await getApplicationDocumentsDirectory()).path}/ingest_latency_bench.sqlite";
  final benchFile = File(benchDbPath);
  if (await benchFile.exists()) {
    await benchFile.delete();
  }
  final benchTextFile = File('$benchDbPath.txt');
  if (await benchTextFile.exists()) {
    await benchTextFile.delete();
  }
  await benchTextFile.writeAsString(content, flush: true);

  await initDbPool(dbPath: benchDbPath, maxSize: 4);
  await initDb();
  await source_rag.initSourceDb();

  final stubEmbedding = Float32List(embeddingDim);
  final contentBytes = Uint8List.fromList(content.codeUnits);

  Future<double> runOnce(_LatencyVariant variant, int iteration) async {
    final collectionId = 'bench-latency-${variant.name}-$iteration';
    final sw = Stopwatch()..start();
    late ingest_session.PreparedIngestion prepared;
    switch (variant) {
      case _LatencyVariant.string:
        prepared = await ingest_session.prepareSourceIngestion(
          collectionId: collectionId,
          content: content,
          metadata: null,
          name: collectionId,
          strategy: ingest_session.IngestStrategy.recursive,
          maxChars: maxChunkChars,
          overlapChars: overlapChars,
        );
        break;
      case _LatencyVariant.utf8:
        prepared = await ingest_session.prepareSourceIngestionFromUtf8(
          collectionId: collectionId,
          contentBytes: contentBytes,
          metadata: null,
          name: collectionId,
          strategy: ingest_session.IngestStrategy.recursive,
          maxChars: maxChunkChars,
          overlapChars: overlapChars,
        );
        break;
      case _LatencyVariant.file:
        prepared = await ingest_session.prepareSourceIngestionFromFile(
          collectionId: collectionId,
          filePath: benchTextFile.path,
          metadata: null,
          name: collectionId,
          strategyHint: ingest_session.IngestStrategy.recursive,
          maxChars: maxChunkChars,
          overlapChars: overlapChars,
        );
        break;
    }
    final session = prepared.session;
    if (session == null) {
      throw StateError(
        'benchmarkIngestLatency: prepare returned no session for variant '
        '${variant.name} (state=${prepared.state}).',
      );
    }
    try {
      var saved = 0;
      while (saved < prepared.totalChunks) {
        final batch = await session.takeEmbeddingBatch(batchSize: batchSize);
        if (batch.isEmpty) break;
        final embeddings = batch
            .map(
              (req) => ingest_session.ChunkEmbedding(
                chunkIndex: req.chunkIndex,
                embedding: stubEmbedding,
              ),
            )
            .toList(growable: false);
        saved += await session.commitEmbeddings(embeddings: embeddings);
      }
      await session.finalize();
    } finally {
      await session.dispose();
    }
    sw.stop();
    await source_rag.deleteSourceInCollection(
      collectionId: collectionId,
      sourceId: prepared.sourceId,
    );
    return sw.elapsedMicroseconds / 1000.0;
  }

  Future<IngestLatencyStats> measureVariant(_LatencyVariant variant) async {
    for (var i = 0; i < warmupRuns; i++) {
      await runOnce(variant, -1 - i);
    }
    final samples = <double>[];
    for (var i = 0; i < measuredRuns; i++) {
      samples.add(await runOnce(variant, i));
    }
    samples.sort();
    final mean = samples.reduce((a, b) => a + b) / samples.length;
    return IngestLatencyStats(
      p50Ms: _percentileFromSorted(samples, 0.5),
      p95Ms: _percentileFromSorted(samples, 0.95),
      meanMs: mean,
      stdevMs: _stdDev(samples, mean),
      samples: List.unmodifiable(samples),
    );
  }

  final stringStats = await measureVariant(_LatencyVariant.string);
  final utf8Stats = await measureVariant(_LatencyVariant.utf8);
  final fileStats = await measureVariant(_LatencyVariant.file);

  await closeDbPool();
  if (await benchFile.exists()) {
    await benchFile.delete();
  }
  if (await benchTextFile.exists()) {
    await benchTextFile.delete();
  }
  if (restoreDbPath != null) {
    await initDbPool(dbPath: restoreDbPath, maxSize: 4);
  }

  return IngestLatencyBenchResult(
    docBytes: docUtf8Bytes,
    warmupRuns: warmupRuns,
    measuredRuns: measuredRuns,
    stringPath: stringStats,
    utf8Path: utf8Stats,
    filePath: fileStats,
  );
}