workOnTask method

  1. @override
Future<void> workOnTask(
  1. TaskExecutionContext context
)
override

Implementation

@override
Future<void> workOnTask(TaskExecutionContext context) async {
  if (!chunkingFinished) {
    File file = File(
      "tmp/source/${base64UrlEncode("$taskId.$maxChunkSize.$maxPostOverlap".encodedUtf8)}.txt",
    );

    await source.writeTo(file);

    IChunker chunker = IChunker(
      maxChunkSize: maxChunkSize,
      maxPostOverlap: maxPostOverlap,
    );

    Stream<List<Chunk>> stream = chunker
        .chunkTextFile(file)
        .accumulateBy(chunkBatchSize, (w) => 1, maxAmount: chunkBatchSize);

    await for (List<Chunk> i in stream) {
      if (!context.hasTimeRemaining) {
        return;
      }

      if (i.isEmpty) {
        continue;
      }

      List<Future<String>> work = [];

      for (Chunk chunk in i) {
        if (lastChunk != null && chunk.index <= lastChunk!) {
          continue;
        }

        work.add(addChunk(chunk));
      }

      List<List<String>> toEmbed = await Stream.fromFutures(work)
          .accumulateBy(embedBatchSize, (w) => 1, maxAmount: embedBatchSize)
          .toList();

      await Future.wait(
        toEmbed.mapIndexed(
          (batch, index) => FireRag.instance.taskManager.schedule(
            TaskEmbed(
              bucket: bucket,
              taskId: "$taskId.embed.$index",
              collection: destinationCollection,
              chunks: batch,
            ),
          ),
        ),
      );

      lastChunk = i.last.index;
    }

    chunkingFinished = true;
  }
  if (shouldScheduleDistillation) {
    await FireRag.instance.taskManager.schedule(
      TaskDistill(
        bucket: bucket,
        embedBatchSize: embedBatchSize,
        taskId: "$taskId.distill.L0",
        record: record,
        lod: 0,
        collection: destinationCollection,
        targetOutputSize: maxChunkSize,
        factor: distillationFactor!,
        size: chunkCount,
      ),
    );
  }

  context.complete();
}