getStreamingChatResponse method

  1. @override
Stream<ChatResponseUpdate> getStreamingChatResponse({
  1. required Iterable<ChatMessage> messages,
  2. ChatOptions? options,
  3. CancellationToken? cancellationToken,
})
override

Sends a chat request and returns a stream of response updates.

Implementation

@override
Stream<ChatResponseUpdate> getStreamingChatResponse({
  required Iterable<ChatMessage> messages,
  ChatOptions? options,
  CancellationToken? cancellationToken,
}) {
  final allTools = _getAllTools(options);
  if (allTools.isEmpty) {
    return super.getStreamingChatResponse(
      messages: messages,
      options: options,
      cancellationToken: cancellationToken,
    );
  }

  Stream<ChatResponseUpdate> stream() async* {
    final messageList = messages.toList();
    var iterations = 0;
    var consecutiveErrors = 0;

    while (true) {
      // Collect the streaming response
      final updates = <ChatResponseUpdate>[];
      final functionCalls = <FunctionCallContent>[];

      await for (final update in super.getStreamingChatResponse(
        messages: messageList,
        options: options,
        cancellationToken: cancellationToken,
      )) {
        updates.add(update);
        functionCalls
            .addAll(update.contents.whereType<FunctionCallContent>());

        // Only yield non-function-call content
        if (update.contents.whereType<FunctionCallContent>().isEmpty) {
          yield update;
        }
      }

      if (functionCalls.isEmpty) return;

      iterations++;
      if (iterations > maximumIterationsPerRequest) return;

      // Build assistant message from updates
      final assistantContents = <AIContent>[];
      for (final update in updates) {
        for (final content in update.contents) {
          assistantContents.add(content);
        }
      }
      messageList.add(ChatMessage(
        role: ChatRole.assistant,
        contents: assistantContents,
      ));

      // Invoke functions
      final results = await _invokeFunctions(
        functionCalls,
        allTools,
        cancellationToken,
      );

      final hasErrors =
          results.any((r) => r.status == FunctionInvocationStatus.exception);
      if (hasErrors) {
        consecutiveErrors++;
        if (consecutiveErrors >= maximumConsecutiveErrorsPerRequest) return;
      } else {
        consecutiveErrors = 0;
      }

      if (results.any((r) => r.terminate)) return;

      final resultContents = results
          .map((r) => FunctionResultContent(
                callId: r.callContent.callId,
                result: r.result,
                exception: r.exception is Exception
                    ? r.exception as Exception
                    : null,
              ))
          .toList();

      messageList.add(ChatMessage(
        role: ChatRole.tool,
        contents: resultContents.cast<AIContent>(),
      ));
    }
  }

  return stream();
}