addSourceWithChunking method

Future<SourceAddResult> addSourceWithChunking(
  1. String content, {
  2. String? metadata,
  3. String? name,
  4. String? filePath,
  5. ChunkingStrategy? strategy,
  6. Duration? chunkDelay,
  7. void onProgress(
    1. int done,
    2. int total
    )?,
})

Add a source document with automatic chunking and embedding.

The document is:

  1. Split into chunks based on file type (auto-detected from filePath)
  2. Each chunk is embedded
  3. Source and chunks are stored in DB

If filePath is provided, chunking strategy is auto-detected:

  • .md, .markdown → Markdown-aware chunking (preserves headers, code blocks)
  • Other files → Default recursive chunking

Implementation

Future<SourceAddResult> addSourceWithChunking(
  String content, {
  String? metadata,
  String? name,
  String? filePath,
  ChunkingStrategy? strategy,
  Duration? chunkDelay,
  void Function(int done, int total)? onProgress,
}) async {
  // 1. Determine model path for isolate
  // Use stored modelPath if available, otherwise try to guess (fallback)
  // NOTE: In production usage via RagEngine, modelPath should always be provided.
  String effectiveModelPath = modelPath ?? '';
  if (effectiveModelPath.isEmpty) {
    final directory = await getApplicationDocumentsDirectory();
    effectiveModelPath = '${directory.path}/bge-m3-quantized-int8.onnx';
    debugPrint(
      '[SourceRagService] Warning: modelPath not set, using fallback: $effectiveModelPath',
    );
  }

  // Check if file exists to avoid isolate crash
  if (!await File(effectiveModelPath).exists()) {
    throw Exception(
      "Embedding model file not found at: $effectiveModelPath. Ensure RagEngine is initialized or modelPath is correct.",
    );
  }

  // 2. Prepare request
  final effectiveStrategy =
      strategy ??
      (filePath != null &&
              (filePath.endsWith('.md') || filePath.endsWith('.markdown'))
          ? ChunkingStrategy.markdown
          : ChunkingStrategy.recursive);

  // 3. Create source row first for crash recovery visibility.
  // This ensures unfinished ingests remain visible as pending/failed states.
  final sourceEntry = await rust_rag.addSourceInCollection(
    collectionId: collectionId,
    content: content,
    metadata: metadata,
    name: name ?? filePath, // Use available identifier
  );
  final sourceId = sourceEntry.sourceId;

  // If already indexed in this collection, skip expensive embed/chunk pipeline.
  if (sourceEntry.isDuplicate) {
    final existingChunkCount = await rust_rag.getSourceChunkCount(
      sourceId: sourceId,
    );
    if (existingChunkCount > 0) {
      await rust_rag.updateSourceStatus(
        sourceId: sourceId,
        status: 'completed',
      );
      return SourceAddResult(
        sourceId: sourceId.toInt(),
        isDuplicate: true,
        chunkCount: 0,
        message: sourceEntry.message,
      );
    }
  }

  // Mark dirty early so abrupt termination still leaves a recovery hint.
  await _markDirty();

  // 4. Process in background isolate with progress reporting
  debugPrint('[SourceRagService] Offloading processing to isolate...');

  final receivePort = ReceivePort();
  final completer = Completer<_ProcessingResult>();

  // Add request with SendPort
  final isolateRequest = _IsolateRequest(
    content: content,
    modelPath: effectiveModelPath,
    maxChars: maxChunkChars,
    overlapChars: overlapChars,
    strategy: effectiveStrategy,
    sendPort: receivePort.sendPort,
  );

  try {
    await Isolate.spawn(_processContentInIsolate, isolateRequest);

    receivePort.listen((message) {
      if (message is List && message.length == 2 && message[0] is int) {
        // Progress update: [done, total]
        onProgress?.call(message[0] as int, message[1] as int);
      } else if (message is _ProcessingResult) {
        // Final result
        receivePort.close();
        completer.complete(message);
      } else if (message is List &&
          message.isNotEmpty &&
          message[0] == 'error') {
        // Error handling
        receivePort.close();
        completer.completeError(Exception(message[1]));
      }
    });

    final result = await completer.future;

    debugPrint(
      '[SourceRagService] Isolate finished. Got ${result.chunks.length} chunks.',
    );

    final chunks = result.chunks;

    // 5. Save chunks to DB.
    await rust_rag.addChunks(sourceId: sourceId, chunks: chunks);
    await rust_rag.updateSourceStatus(
      sourceId: sourceId,
      status: 'completed',
    );

    return SourceAddResult(
      sourceId: sourceId.toInt(),
      isDuplicate: false,
      chunkCount: chunks.length,
      message: sourceEntry.isDuplicate
          ? 'Source resumed and completed'
          : sourceEntry.message,
    );
  } catch (e) {
    try {
      await rust_rag.updateSourceStatus(sourceId: sourceId, status: 'failed');
    } catch (_) {}
    receivePort.close();
    rethrow;
  }
}