addSourceWithChunking method
Add a source document with automatic chunking and embedding.
The document is:
- Split into chunks based on file type (auto-detected from
filePath) - Each chunk is embedded (micro-batch streaming,
kIngestionBatchSizeat a time) - Source and chunks are incrementally stored in DB
If filePath is provided, chunking strategy is auto-detected:
.md,.markdown→ Markdown-aware chunking (preserves headers, code blocks)- Other files → Default recursive chunking
Memory safety: Uses streaming micro-batch pipeline instead of loading
all embeddings into memory. Each batch of kIngestionBatchSize chunks is
embedded, saved to DB, then released — keeping memory usage flat.
chunkDelay controls the yield duration between batches (default: 10ms).
This allows GC to run and prevents thermal throttling on mobile devices.
Implementation
Future<SourceAddResult> addSourceWithChunking(
String content, {
String? metadata,
String? name,
String? filePath,
ChunkingStrategy? strategy,
Duration? chunkDelay,
void Function(int done, int total)? onProgress,
}) async {
// 1. Determine chunking strategy
final effectiveStrategy =
strategy ??
(filePath != null &&
(filePath.endsWith('.md') || filePath.endsWith('.markdown'))
? ChunkingStrategy.markdown
: ChunkingStrategy.recursive);
// 2. Create source row first for crash recovery visibility.
// This ensures unfinished ingests remain visible as pending/failed states.
final sourceEntry = await rust_rag.addSourceInCollection(
collectionId: collectionId,
content: content,
metadata: metadata,
name: name ?? filePath, // Use available identifier
);
final sourceId = sourceEntry.sourceId;
var resumedExistingSource = sourceEntry.isDuplicate;
// 3. Attempt to claim source for ingestion.
// Only pending/failed sources can transition to processing.
var claimed = await rust_rag.claimSourceForIngestion(sourceId: sourceId);
if (!claimed) {
final status = await rust_rag.getSourceStatus(sourceId: sourceId);
final existingChunkCount = await rust_rag.getSourceChunkCount(
sourceId: sourceId,
);
switch (decideDuplicateSourceIngestion(
status: status,
chunkCount: existingChunkCount,
)) {
case DuplicateSourceIngestionDecision.skipCompleted:
await rust_rag.updateSourceStatus(
sourceId: sourceId,
status: 'completed',
);
return SourceAddResult(
sourceId: sourceId.toInt(),
isDuplicate: true,
chunkCount: 0,
message: sourceEntry.message,
);
case DuplicateSourceIngestionDecision.alreadyInProgress:
return SourceAddResult(
sourceId: sourceId.toInt(),
isDuplicate: true,
chunkCount: 0,
message: 'Source ingestion already in progress',
);
case DuplicateSourceIngestionDecision.resetAndResume:
resumedExistingSource = true;
await rust_rag.clearSourceChunks(sourceId: sourceId);
claimed = await rust_rag.claimSourceForIngestion(sourceId: sourceId);
if (!claimed) {
return SourceAddResult(
sourceId: sourceId.toInt(),
isDuplicate: true,
chunkCount: 0,
message: 'Source ingestion already in progress',
);
}
}
}
// Duplicate resume path: defensively clear stale partial chunks left by
// older versions that failed before strict rollback was introduced.
if (sourceEntry.isDuplicate && claimed) {
resumedExistingSource = true;
final staleChunkCount = await rust_rag.getSourceChunkCount(
sourceId: sourceId,
);
if (staleChunkCount > 0) {
await rust_rag.clearSourceChunks(sourceId: sourceId);
}
}
// Mark dirty only when ingestion will actually run.
if (!claimed) {
throw StateError('Unable to claim source $sourceId for ingestion.');
}
await _markDirty();
// 4. Streaming micro-batch ingestion pipeline
// - Embed kIngestionBatchSize chunks at a time
// - Save to DB immediately (incremental commit)
// - Release memory and yield to event loop
final effectiveDelay = chunkDelay ?? const Duration(milliseconds: 10);
int savedCount = 0;
try {
// Chunking runs on main isolate (no ONNX double-loading).
debugPrint('[SourceRagService] Chunking content...');
final List<dynamic> rawChunks;
if (effectiveStrategy == ChunkingStrategy.markdown) {
rawChunks = markdownChunk(text: content, maxChars: maxChunkChars);
} else {
rawChunks = semanticChunkWithOverlap(
text: content,
maxChars: maxChunkChars,
overlapChars: overlapChars,
);
}
final total = rawChunks.length;
debugPrint(
'[SourceRagService] Got $total chunks. Starting streaming ingestion...',
);
for (var i = 0; i < total; i += kIngestionBatchSize) {
final batchEnd = (i + kIngestionBatchSize).clamp(0, total);
final batchChunks = <ChunkData>[];
for (var j = i; j < batchEnd; j++) {
final rawChunk = rawChunks[j];
String contentStr;
String chunkType;
int chunkIdx;
int startPos;
int endPos;
if (effectiveStrategy == ChunkingStrategy.markdown) {
final c = rawChunk as StructuredChunk;
contentStr = c.content;
chunkType = c.headerPath.isNotEmpty
? '${c.chunkType}|${c.headerPath}'
: c.chunkType;
chunkIdx = c.index;
startPos = c.startPos;
endPos = c.endPos;
} else {
final c = rawChunk as SemanticChunk;
contentStr = c.content;
chunkType = c.chunkType;
chunkIdx = c.index;
startPos = c.startPos;
endPos = c.endPos;
}
// Embed using the already-loaded ONNX session (no double-loading)
final embedding = await EmbeddingService.embed(contentStr);
batchChunks.add(
ChunkData(
content: contentStr,
chunkIndex: chunkIdx,
startPos: startPos,
endPos: endPos,
chunkType: chunkType,
embedding: Float32List.fromList(embedding),
),
);
}
// Incremental DB save — batch is committed and can be released
await rust_rag.addChunks(sourceId: sourceId, chunks: batchChunks);
savedCount += batchChunks.length;
onProgress?.call(savedCount, total);
// Yield to event loop: allows GC, UI updates, and thermal cooldown
if (i + kIngestionBatchSize < total) {
await Future.delayed(effectiveDelay);
}
}
await rust_rag.updateSourceStatus(
sourceId: sourceId,
status: 'completed',
);
return SourceAddResult(
sourceId: sourceId.toInt(),
isDuplicate: sourceEntry.isDuplicate,
chunkCount: savedCount,
message: resumedExistingSource
? 'Source resumed and completed'
: sourceEntry.message,
);
} catch (e) {
try {
await rust_rag.clearSourceChunks(sourceId: sourceId);
} catch (_) {}
try {
await rust_rag.updateSourceStatus(sourceId: sourceId, status: 'failed');
} catch (_) {}
rethrow;
}
}