consume method

Consumes the stream of chunks and returns a stream of chunks.

Implementation

Future<AiSDKConsumeValue> consume() async {
  final chunks = <BaseChunk>[];
  final controller = StreamController<BaseChunk>();

  () async {
    stream.listen(
      (event) {
        if (controller.isClosed) return;
        try {
          final decodedChunk = String.fromCharCodes(event);
          final decodedChunks = decodedChunk.split('\n').where((line) => line.trim().isNotEmpty).toList(); // stream smoothing sends multiple chunks at once
          for (final chunk in decodedChunks) {
            if (controller.isClosed) break;

            if (chunk.startsWith("0:")) {
              final data = chunk.substring(2);
              final textChunk = TextChunk(data: _cleanChunk(data));
              controller.add(textChunk);
              chunks.add(textChunk);
            } else if (chunk.startsWith("g:")) {
              final data = chunk.substring(2);
              final reasoningChunk = ReasoningChunk(data: _cleanChunk(data));
              controller.add(reasoningChunk);
              chunks.add(reasoningChunk);
            } else if (chunk.startsWith("i:")) {
              final data = chunk.substring(2);
              final redactedReasoningChunk = RedactedReasoningChunk(data: jsonDecode(data));
              controller.add(redactedReasoningChunk);
              chunks.add(redactedReasoningChunk);
            } else if (chunk.startsWith("j:")) {
              final data = chunk.substring(2);
              final reasoningSignatureChunk = ReasoningSignatureChunk(data: jsonDecode(data));
              controller.add(reasoningSignatureChunk);
              chunks.add(reasoningSignatureChunk);
            } else if (chunk.startsWith("h:")) {
              final data = chunk.substring(2);
              final sourceChunk = SourceChunk(data: jsonDecode(data));
              controller.add(sourceChunk);
              chunks.add(sourceChunk);
            } else if (chunk.startsWith("k:")) {
              final data = chunk.substring(2);
              final fileChunk = FileChunk(data: jsonDecode(data));
              controller.add(fileChunk);
              chunks.add(fileChunk);
            } else if (chunk.startsWith("2:")) {
              final data = chunk.substring(2);
              final dataChunk = DataChunk(data: jsonDecode(data));
              controller.add(dataChunk);
              chunks.add(dataChunk);
            } else if (chunk.startsWith("8:")) {
              final data = chunk.substring(2);
              final messageAnnotationChunk = MessageAnnotation(data: jsonDecode(data));
              controller.add(messageAnnotationChunk);
              chunks.add(messageAnnotationChunk);
            } else if (chunk.startsWith("3:")) {
              final data = chunk.substring(2);
              final errorChunk = ErrorChunk(data: _cleanChunk(data));
              controller.add(errorChunk);
              chunks.add(errorChunk);
            } else if (chunk.startsWith("b:")) {
              final data = chunk.substring(2);
              final toolCallStreamingStartChunk = ToolCallStreamingStartChunk(data: jsonDecode(data));
              controller.add(toolCallStreamingStartChunk);
              chunks.add(toolCallStreamingStartChunk);
            } else if (chunk.startsWith("c:")) {
              final data = chunk.substring(2);
              final toolCallDeltaChunk = ToolCallDeltaChunk(data: jsonDecode(data));
              controller.add(toolCallDeltaChunk);
              chunks.add(toolCallDeltaChunk);
            } else if (chunk.startsWith("9:")) {
              final data = chunk.substring(2);
              final toolCallChunk = ToolCallChunk(data: jsonDecode(data));
              controller.add(toolCallChunk);
              chunks.add(toolCallChunk);
            } else if (chunk.startsWith("a:")) {
              final data = chunk.substring(2);
              final toolResultChunk = ToolResultChunk(data: jsonDecode(data));
              controller.add(toolResultChunk);
              chunks.add(toolResultChunk);
            } else if (chunk.startsWith("f:")) {
              final data = chunk.substring(2);
              final startStepChunk = StartStepChunk(data: jsonDecode(data));
              controller.add(startStepChunk);
              chunks.add(startStepChunk);
            } else if (chunk.startsWith("e:")) {
              final data = chunk.substring(2);
              final finishStepChunk = FinishStepChunk(data: jsonDecode(data));
              controller.add(finishStepChunk);
              chunks.add(finishStepChunk);
            } else if (chunk.startsWith("d:")) {
              final data = chunk.substring(2);
              final finishMessageChunk = FinishMessageChunk(data: jsonDecode(data));
              controller.add(finishMessageChunk);
              chunks.add(finishMessageChunk);
            } else {
              final unknownChunk = UnknownChunk(data: chunk);
              controller.add(unknownChunk);
              chunks.add(unknownChunk);
            }
            if (chunk.startsWith(endChunkPrefix)) {
              controller.close();
              break;
            }
          }
        } catch (e) {}
      },
      onError: (error, stackTrace) {
        print(error);
        print(stackTrace);
      },
      onDone: () {
        print('done');
      },
    );
  }();

  return AiSDKConsumeValue(controller: controller, chunks: chunks);
}