addSourceWithChunking method

Future<SourceAddResult> addSourceWithChunking(
  1. String content, {
  2. String? metadata,
  3. String? name,
  4. String? filePath,
  5. ChunkingStrategy? strategy,
  6. Duration? chunkDelay,
  7. void onProgress(
    1. int done,
    2. int total
    )?,
})

Add a source document with automatic chunking and embedding.

The document is:

  1. Split into chunks based on file type (auto-detected from filePath)
  2. Each chunk is embedded (micro-batch streaming, kIngestionBatchSize at a time)
  3. Source and chunks are incrementally stored in DB

If filePath is provided, chunking strategy is auto-detected:

  • .md, .markdown → Markdown-aware chunking (preserves headers, code blocks)
  • Other files → Default recursive chunking

Memory safety: Uses streaming micro-batch pipeline instead of loading all embeddings into memory. Each batch of kIngestionBatchSize chunks is embedded, saved to DB, then released — keeping memory usage flat.

chunkDelay controls the yield duration between batches (default: 10ms). This allows GC to run and prevents thermal throttling on mobile devices.

Implementation

Future<SourceAddResult> addSourceWithChunking(
  String content, {
  String? metadata,
  String? name,
  String? filePath,
  ChunkingStrategy? strategy,
  Duration? chunkDelay,
  void Function(int done, int total)? onProgress,
}) async {
  // 1. Determine chunking strategy
  final effectiveStrategy =
      strategy ??
      (filePath != null &&
              (filePath.endsWith('.md') || filePath.endsWith('.markdown'))
          ? ChunkingStrategy.markdown
          : ChunkingStrategy.recursive);

  // 2. Create source row first for crash recovery visibility.
  // This ensures unfinished ingests remain visible as pending/failed states.
  final sourceEntry = await rust_rag.addSourceInCollection(
    collectionId: collectionId,
    content: content,
    metadata: metadata,
    name: name ?? filePath, // Use available identifier
  );
  final sourceId = sourceEntry.sourceId;
  var resumedExistingSource = sourceEntry.isDuplicate;

  // 3. Attempt to claim source for ingestion.
  //    Only pending/failed sources can transition to processing.
  var claimed = await rust_rag.claimSourceForIngestion(sourceId: sourceId);
  if (!claimed) {
    final status = await rust_rag.getSourceStatus(sourceId: sourceId);
    final existingChunkCount = await rust_rag.getSourceChunkCount(
      sourceId: sourceId,
    );
    switch (decideDuplicateSourceIngestion(
      status: status,
      chunkCount: existingChunkCount,
    )) {
      case DuplicateSourceIngestionDecision.skipCompleted:
        await rust_rag.updateSourceStatus(
          sourceId: sourceId,
          status: 'completed',
        );
        return SourceAddResult(
          sourceId: sourceId.toInt(),
          isDuplicate: true,
          chunkCount: 0,
          message: sourceEntry.message,
        );
      case DuplicateSourceIngestionDecision.alreadyInProgress:
        return SourceAddResult(
          sourceId: sourceId.toInt(),
          isDuplicate: true,
          chunkCount: 0,
          message: 'Source ingestion already in progress',
        );
      case DuplicateSourceIngestionDecision.resetAndResume:
        resumedExistingSource = true;
        await rust_rag.clearSourceChunks(sourceId: sourceId);
        claimed = await rust_rag.claimSourceForIngestion(sourceId: sourceId);
        if (!claimed) {
          return SourceAddResult(
            sourceId: sourceId.toInt(),
            isDuplicate: true,
            chunkCount: 0,
            message: 'Source ingestion already in progress',
          );
        }
    }
  }

  // Duplicate resume path: defensively clear stale partial chunks left by
  // older versions that failed before strict rollback was introduced.
  if (sourceEntry.isDuplicate && claimed) {
    resumedExistingSource = true;
    final staleChunkCount = await rust_rag.getSourceChunkCount(
      sourceId: sourceId,
    );
    if (staleChunkCount > 0) {
      await rust_rag.clearSourceChunks(sourceId: sourceId);
    }
  }

  // Mark dirty only when ingestion will actually run.
  if (!claimed) {
    throw StateError('Unable to claim source $sourceId for ingestion.');
  }
  await _markDirty();

  // 4. Streaming micro-batch ingestion pipeline
  //    - Embed kIngestionBatchSize chunks at a time
  //    - Save to DB immediately (incremental commit)
  //    - Release memory and yield to event loop
  final effectiveDelay = chunkDelay ?? const Duration(milliseconds: 10);
  int savedCount = 0;

  try {
    // Chunking runs on main isolate (no ONNX double-loading).
    debugPrint('[SourceRagService] Chunking content...');
    final List<dynamic> rawChunks;
    if (effectiveStrategy == ChunkingStrategy.markdown) {
      rawChunks = markdownChunk(text: content, maxChars: maxChunkChars);
    } else {
      rawChunks = semanticChunkWithOverlap(
        text: content,
        maxChars: maxChunkChars,
        overlapChars: overlapChars,
      );
    }

    final total = rawChunks.length;
    debugPrint(
      '[SourceRagService] Got $total chunks. Starting streaming ingestion...',
    );

    for (var i = 0; i < total; i += kIngestionBatchSize) {
      final batchEnd = (i + kIngestionBatchSize).clamp(0, total);
      final batchChunks = <ChunkData>[];

      for (var j = i; j < batchEnd; j++) {
        final rawChunk = rawChunks[j];
        String contentStr;
        String chunkType;
        int chunkIdx;
        int startPos;
        int endPos;

        if (effectiveStrategy == ChunkingStrategy.markdown) {
          final c = rawChunk as StructuredChunk;
          contentStr = c.content;
          chunkType = c.headerPath.isNotEmpty
              ? '${c.chunkType}|${c.headerPath}'
              : c.chunkType;
          chunkIdx = c.index;
          startPos = c.startPos;
          endPos = c.endPos;
        } else {
          final c = rawChunk as SemanticChunk;
          contentStr = c.content;
          chunkType = c.chunkType;
          chunkIdx = c.index;
          startPos = c.startPos;
          endPos = c.endPos;
        }

        // Embed using the already-loaded ONNX session (no double-loading)
        final embedding = await EmbeddingService.embed(contentStr);

        batchChunks.add(
          ChunkData(
            content: contentStr,
            chunkIndex: chunkIdx,
            startPos: startPos,
            endPos: endPos,
            chunkType: chunkType,
            embedding: Float32List.fromList(embedding),
          ),
        );
      }

      // Incremental DB save — batch is committed and can be released
      await rust_rag.addChunks(sourceId: sourceId, chunks: batchChunks);
      savedCount += batchChunks.length;
      onProgress?.call(savedCount, total);

      // Yield to event loop: allows GC, UI updates, and thermal cooldown
      if (i + kIngestionBatchSize < total) {
        await Future.delayed(effectiveDelay);
      }
    }

    await rust_rag.updateSourceStatus(
      sourceId: sourceId,
      status: 'completed',
    );

    return SourceAddResult(
      sourceId: sourceId.toInt(),
      isDuplicate: sourceEntry.isDuplicate,
      chunkCount: savedCount,
      message: resumedExistingSource
          ? 'Source resumed and completed'
          : sourceEntry.message,
    );
  } catch (e) {
    try {
      await rust_rag.clearSourceChunks(sourceId: sourceId);
    } catch (_) {}
    try {
      await rust_rag.updateSourceStatus(sourceId: sourceId, status: 'failed');
    } catch (_) {}
    rethrow;
  }
}