sendStreamingChatRequest method

Stream<Map<String, dynamic>> sendStreamingChatRequest(
  1. List<Map<String, dynamic>> messages,
  2. List<Map<String, dynamic>>? tools, {
  3. double? temperature,
  4. int? maxCompletionTokens,
  5. double? topP,
  6. String? reasoningEffort,
  7. dynamic stop,
  8. CancellationToken? cancellationToken,
})

Sends a streaming chat request to the API.

Implementation

Stream<Map<String, dynamic>> sendStreamingChatRequest(
  List<Map<String, dynamic>> messages,
  List<Map<String, dynamic>>? tools, {
  double? temperature,
  int? maxCompletionTokens,
  double? topP,
  String? reasoningEffort,
  dynamic stop,
  CancellationToken? cancellationToken,
}) async* {
  sdkLogger.info(
    'Sending streaming chat request to API (model: $model)',
    tag: 'API',
    extra: {
      'model': model,
      'message_count': messages.length,
      'has_tools': tools != null && tools.isNotEmpty,
      if (sdkLogger.options.logSensitiveContent) 'messages': messages,
    },
  );

  final headers = {
    'Content-Type': 'application/json',
    'Authorization': 'Bearer $apiKey',
  };

  final body = jsonEncode({
    'model': model,
    'messages': messages,
    'tools': tools,
    'tool_choice': 'auto',
    'stream': true,
    'stream_options': {"include_usage": true},
    if ((temperature ?? this.temperature) != null)
      'temperature': temperature ?? this.temperature,
    if ((maxCompletionTokens ?? this.maxCompletionTokens) != null)
      'max_completion_tokens':
          maxCompletionTokens ?? this.maxCompletionTokens,
    if ((topP ?? this.topP) != null) 'top_p': topP ?? this.topP,
    if ((reasoningEffort ?? this.reasoningEffort) != null)
      'reasoning_effort': reasoningEffort ?? this.reasoningEffort,
    if ((stop ?? this.stop) != null) 'stop': stop ?? this.stop,
  });

  try {
    final request = http.Request('POST', Uri.parse(baseUrl));
    request.headers.addAll(headers);
    request.body = body;

    sdkLogger.debug(
      'Opening API stream connection',
      tag: 'API',
      extra: {
        'headers': {...headers, 'Authorization': 'Bearer [REDACTED]'},
      },
    );

    final response = await _httpClient.send(request);

    if (response.statusCode != 200) {
      final errorBody = await response.stream.bytesToString();
      sdkLogger.error(
        'Streaming requested failed',
        tag: 'API',
        extra: {
          'status_code': response.statusCode,
          if (sdkLogger.options.logSensitiveContent) 'error_body': errorBody,
        },
      );
      throw Exception('Streaming request failed: ${response.statusCode}');
    }

    yield* response.stream
        .takeWhile((_) => !(cancellationToken?.isCancelled ?? false))
        .transform(utf8.decoder)
        .transform(const LineSplitter())
        .where((line) => line.trim().isNotEmpty)
        .map((line) {
          if (line.startsWith('data: ')) {
            final data = line.substring(6).trim();
            if (data == '[DONE]') return null;
            try {
              return jsonDecode(data) as Map<String, dynamic>;
            } catch (e) {
              sdkLogger.error('Error decoding SSE chunk', tag: 'API');
              return null;
            }
          }
          return null;
        })
        .where((json) => json != null)
        .cast<Map<String, dynamic>>();
  } catch (e, stackTrace) {
    sdkLogger.error(
      'Error in streaming request',
      tag: 'API',
      error: e,
      stackTrace: stackTrace,
    );
    rethrow;
  }
}