sendMessageStream method

Stream<GenerateContentResponse> sendMessageStream(
  1. Content message
)

Continues the chat with a new message.

Sends message to the model as a continuation of the chat history and reads the response in a stream. Prepends the history to the request and uses the provided model to generate new content.

When there are no candidates in any response in the stream, the message and responses are ignored and will not be recorded in the history.

Waits for any ongoing or pending requests to sendMessage or sendMessageStream to complete before generating new content. Successful messages and responses for ongoing or pending requests will be reflected in the history sent for this message.

Waits to read the entire streamed response before recording the message and response and allowing pending messages to be sent.

Implementation

Stream<GenerateContentResponse> sendMessageStream(Content message) {
  final controller = StreamController<GenerateContentResponse>();
  _mutex.acquire().then((lock) async {
    try {
      final requestHistory = <Content>[message];
      var turn = 0;
      while (turn < _maxTurns) {
        final responses = _generateContentStream(
            _history.followedBy(requestHistory),
            safetySettings: _safetySettings,
            generationConfig: _generationConfig);

        final turnChunks = <GenerateContentResponse>[];
        await for (final response in responses) {
          turnChunks.add(response);
          controller.add(response);
        }
        if (turnChunks.isEmpty) break;
        final aggregatedContent = historyAggregate(turnChunks.map((r) {
          final content = r.candidates.firstOrNull?.content;
          if (content == null) {
            throw Exception('No content in response candidate');
          }
          return content;
        }).toList());

        final functionCalls =
            aggregatedContent.parts.whereType<FunctionCall>().toList();

        // Check if we should actually execute these functions.
        final shouldAutoExecute = _autoFunctionDeclarations != null &&
            _autoFunctionDeclarations.isNotEmpty &&
            functionCalls.isNotEmpty &&
            functionCalls
                .every((c) => _autoFunctionDeclarations.containsKey(c.name));

        if (!shouldAutoExecute) {
          _history.addAll(requestHistory);
          _history.add(aggregatedContent);
          return;
        }

        requestHistory.add(aggregatedContent);
        final functionResponseFutures =
            functionCalls.map((functionCall) async {
          final function = _autoFunctionDeclarations[functionCall.name];

          Object? result;
          try {
            result = await function!.callable(functionCall.args);
          } catch (e) {
            result = e.toString();
          }
          return FunctionResponse(functionCall.name, {'result': result});
        });
        final functionResponseParts =
            await Future.wait(functionResponseFutures);
        requestHistory.add(Content.functionResponses(functionResponseParts));
        turn++;
      }
      throw Exception('Max turns of $_maxTurns reached.');
    } catch (e, s) {
      controller.addError(e, s);
    } finally {
      lock.release();
      unawaited(controller.close());
    }
  });
  return controller.stream;
}