workOnTask method
Future<void>
workOnTask(
- TaskExecutionContext context
)
override
Implementation
@override
Future<void> workOnTask(TaskExecutionContext context) async {
if (!chunkingFinished) {
File file = File(
"tmp/source/${base64UrlEncode("$taskId.$maxChunkSize.$maxPostOverlap".encodedUtf8)}.txt",
);
await source.writeTo(file);
IChunker chunker = IChunker(
maxChunkSize: maxChunkSize,
maxPostOverlap: maxPostOverlap,
);
Stream<List<Chunk>> stream = chunker
.chunkTextFile(file)
.accumulateBy(chunkBatchSize, (w) => 1, maxAmount: chunkBatchSize);
await for (List<Chunk> i in stream) {
if (!context.hasTimeRemaining) {
return;
}
if (i.isEmpty) {
continue;
}
List<Future<String>> work = [];
for (Chunk chunk in i) {
if (lastChunk != null && chunk.index <= lastChunk!) {
continue;
}
work.add(addChunk(chunk));
}
List<List<String>> toEmbed = await Stream.fromFutures(work)
.accumulateBy(embedBatchSize, (w) => 1, maxAmount: embedBatchSize)
.toList();
await Future.wait(
toEmbed.mapIndexed(
(batch, index) => FireRag.instance.taskManager.schedule(
TaskEmbed(
bucket: bucket,
taskId: "$taskId.embed.$index",
collection: destinationCollection,
chunks: batch,
),
),
),
);
lastChunk = i.last.index;
}
chunkingFinished = true;
}
if (shouldScheduleDistillation) {
await FireRag.instance.taskManager.schedule(
TaskDistill(
bucket: bucket,
embedBatchSize: embedBatchSize,
taskId: "$taskId.distill.L0",
record: record,
lod: 0,
collection: destinationCollection,
targetOutputSize: maxChunkSize,
factor: distillationFactor!,
size: chunkCount,
),
);
}
context.complete();
}