createMessageStream method

  1. @override
Stream<StreamEvent> createMessageStream({
  1. required List<Message> messages,
  2. required String systemPrompt,
  3. List<ToolDefinition> tools = const [],
  4. int? maxTokens,
})
override

Stream a message completion via the Gemini streamGenerateContent endpoint.

The endpoint returns SSE events of the form:

data: {"candidates":[{"content":{"parts":[{"text":"..."}]}}]}

These are translated into the app's StreamEvent types so the rest of the codebase can consume them identically to Anthropic or OpenAI streams.

Implementation

@override
Stream<StreamEvent> createMessageStream({
  required List<Message> messages,
  required String systemPrompt,
  List<ToolDefinition> tools = const [],
  int? maxTokens,
}) async* {
  final body = _buildRequestBody(
    messages: messages,
    systemPrompt: systemPrompt,
    tools: tools,
    maxTokens: maxTokens,
  );

  final url = '${config.baseUrl}/models/${config.model}'
      ':streamGenerateContent?alt=sse&key=${config.apiKey}';

  final request = http.Request('POST', Uri.parse(url));
  request.headers.addAll({
    'Content-Type': 'application/json',
    ...config.extraHeaders,
  });
  request.body = jsonEncode(body);

  _log.d('Gemini stream request to ${config.model}');

  final client = http.Client();
  http.StreamedResponse response;
  try {
    response = await client.send(request);
  } catch (e) {
    client.close();
    _log.e('Gemini connection error', error: e);
    yield ErrorEvent(message: 'Gemini connection error: $e', type: 'network_error');
    return;
  }

  if (response.statusCode != 200) {
    final errorBody = await response.stream.bytesToString();
    client.close();
    _log.e('Gemini API error ${response.statusCode}: $errorBody');
    yield ErrorEvent(
      message: 'Gemini API error ${response.statusCode}: $errorBody',
      type: 'api_error',
    );
    return;
  }

  // Emit synthetic message-start event.
  final messageId = 'msg_${_uuid.v4()}';
  yield MessageStartEvent(messageId: messageId, model: config.model);

  var blockIndex = 0;
  var hasStartedText = false;

  await for (final event in _parseGeminiSSE(response.stream)) {
    final candidates = event['candidates'] as List?;
    if (candidates == null || candidates.isEmpty) {
      // Check for top-level errors.
      final error = event['error'] as Map<String, dynamic>?;
      if (error != null) {
        yield ErrorEvent(
          message: error['message'] as String? ?? 'Unknown Gemini error',
          type: 'api_error',
        );
      }
      continue;
    }

    final candidate = candidates[0] as Map<String, dynamic>;
    final content = candidate['content'] as Map<String, dynamic>?;
    final parts = content?['parts'] as List?;

    if (parts != null) {
      for (final part in parts) {
        final partMap = part as Map<String, dynamic>;

        // Handle text content.
        final text = partMap['text'] as String?;
        if (text != null && text.isNotEmpty) {
          if (!hasStartedText) {
            yield ContentBlockStartEvent(
              index: blockIndex,
              block: const TextBlock(''),
            );
            hasStartedText = true;
          }
          yield ContentBlockDeltaEvent(index: blockIndex, text: text);
        }

        // Handle function calls (tool use).
        final functionCall = partMap['functionCall'] as Map<String, dynamic>?;
        if (functionCall != null) {
          if (hasStartedText) {
            yield ContentBlockStopEvent(index: blockIndex);
            blockIndex++;
            hasStartedText = false;
          }

          final name = functionCall['name'] as String? ?? '';
          final args =
              (functionCall['args'] as Map<String, dynamic>?) ?? {};
          final toolId = 'call_${_uuid.v4()}';

          yield ContentBlockStartEvent(
            index: blockIndex,
            block: ToolUseBlock(id: toolId, name: name, input: args),
          );
          // Emit the full arguments as a delta for consistency.
          yield ContentBlockDeltaEvent(
            index: blockIndex,
            text: jsonEncode(args),
          );
          yield ContentBlockStopEvent(index: blockIndex);
          blockIndex++;
        }
      }
    }

    // Check for finish reason.
    final finishReason = candidate['finishReason'] as String?;
    if (finishReason != null) {
      if (hasStartedText) {
        yield ContentBlockStopEvent(index: blockIndex);
        hasStartedText = false;
      }

      final stopReason = _parseFinishReason(finishReason);
      final usageMetadata = event['usageMetadata'] as Map<String, dynamic>?;

      yield MessageDeltaEvent(
        stopReason: stopReason,
        usage: usageMetadata != null
            ? TokenUsage(
                inputTokens:
                    usageMetadata['promptTokenCount'] as int? ?? 0,
                outputTokens:
                    usageMetadata['candidatesTokenCount'] as int? ?? 0,
              )
            : null,
      );
      yield const MessageStopEvent();
    }
  }

  client.close();
}