workOnTask method

  1. @override
Future<void> workOnTask(
  1. TaskExecutionContext context
)
override

Implementation

@override
Future<void> workOnTask(TaskExecutionContext context) async {
  if (!chunkingFinished) {
    File file = File(
      "tmp/source/${base64UrlEncode("$taskId.$maxChunkSize.$maxPostOverlap".encodedUtf8)}.txt",
    );

    await source.writeTo(file);

    IChunker chunker = IChunker(
      maxChunkSize: maxChunkSize,
      maxPostOverlap: maxPostOverlap,
    );

    Stream<List<Chunk>> stream = chunker
        .chunkTextFile(file)
        .accumulateBy(chunkBatchSize, (w) => 1, maxAmount: chunkBatchSize);

    await for (List<Chunk> i in stream) {
      if (!context.hasTimeRemaining) {
        return;
      }

      if (i.isEmpty) {
        continue;
      }

      List<Future<String>> work = [];

      for (Chunk chunk in i) {
        if (lastChunk != null && chunk.index <= lastChunk!) {
          continue;
        }

        work.add(addChunk(chunk));
      }

      List<List<String>> toEmbed = await Stream.fromFutures(work)
          .accumulateBy(embedBatchSize, (w) => 1, maxAmount: embedBatchSize)
          .toList();

      await Future.wait(
        toEmbed.mapIndexed(
          (batch, index) => FireRag.instance.taskManager.schedule(
            TaskEmbed(
              lod: 0,
              bucket: bucket,
              taskId: "$taskId.embed.$index",
              collection: destinationCollection,
              recordLocation: recordLocation,
              chunks: batch,
            ),
          ),
        ),
      );

      lastChunk = i.last.index;
    }

    await FireRag.instance.pushRecordProgress(
      location: recordLocation,
      deltaCoreTotal: chunkCount,
    );
    chunkingFinished = true;
  }
  if (shouldScheduleDistillation) {
    await Future.wait([
      FireRag.instance.pushRecordProgress(
        location: recordLocation,
        deltaDistilledTotal: (chunkCount / distillationFactor!.toDouble())
            .ceil(),
      ),
      FireRag.instance.taskManager.schedule(
        TaskDistill(
          bucket: bucket,
          embedBatchSize: embedBatchSize,
          taskId: "$taskId.distill.L0",
          recordLocation: recordLocation,
          lod: 0,
          collection: destinationCollection,
          targetOutputSize: maxChunkSize,
          factor: distillationFactor!,
          size: chunkCount,
        ),
      ),
    ]);
  }

  context.complete();
}