sendStreamingChatRequest method

  1. @override
Stream<Map<String, dynamic>> sendStreamingChatRequest(
  1. List<Map<String, dynamic>> messages,
  2. List<Map<String, dynamic>>? tools, {
  3. double? temperature,
  4. int? maxCompletionTokens,
  5. double? topP,
  6. String? reasoningEffort,
  7. dynamic stop,
  8. CancellationToken? cancellationToken,
})
override

Sends a streaming chat request to the LLM, yielding chunks as they arrive.

Implementation

@override
Stream<Map<String, dynamic>> sendStreamingChatRequest(
  List<Map<String, dynamic>> messages,
  List<Map<String, dynamic>>? tools, {
  double? temperature,
  int? maxCompletionTokens,
  double? topP,
  String? reasoningEffort,
  dynamic stop,
  CancellationToken? cancellationToken,
}) async* {
  sdkLogger.info(
    'Opening Gemini stream (model: $model)',
    tag: 'API',
    extra: {'model': model, 'message_count': messages.length},
  );

  final geminiData = _convertToGeminiFormat(messages, tools);

  final generationConfig = {
    'maxOutputTokens': maxCompletionTokens ?? maxTokens,
    if ((temperature ?? this.temperature) != null)
      'temperature': temperature ?? this.temperature,
    if (topP != null) 'topP': topP,
    if (stop != null) 'stopSequences': stop is List ? stop : [stop],
  };

  final bodyMap = {
    if (geminiData['systemInstruction'] != null)
      'systemInstruction': geminiData['systemInstruction'],
    'contents': geminiData['contents'],
    if (geminiData['tools'] != null) 'tools': geminiData['tools'],
    'generationConfig': generationConfig,
  };

  if (cancellationToken?.isCancelled == true) {
    throw VanturaCancellationException();
  }

  final url =
      '$baseUrl/models/$model:streamGenerateContent?alt=sse&key=$apiKey';

  final request = http.Request('POST', Uri.parse(url));
  request.headers.addAll(_headers);
  request.body = jsonEncode(bodyMap);

  final streamedResponse = await _httpClient.send(request);

  if (streamedResponse.statusCode != 200) {
    final errorBody = await streamedResponse.stream.bytesToString();
    throw VanturaApiException(
      'Gemini streaming error',
      statusCode: streamedResponse.statusCode,
      responseBody: errorBody,
    );
  }

  await for (final line
      in streamedResponse.stream
          .takeWhile((_) => !(cancellationToken?.isCancelled ?? false))
          .transform(utf8.decoder)
          .transform(const LineSplitter())) {
    if (cancellationToken?.isCancelled == true) break;

    final trimmed = line.trim();
    if (trimmed.isEmpty) continue;

    if (!trimmed.startsWith('data: ')) continue;
    final data = trimmed.substring(6).trim();

    try {
      final json = jsonDecode(data);
      yield _convertFromGeminiResponse(json, isDelta: true);
    } catch (_) {
      continue;
    }
  }
}