sendStreamingChatRequest method

  1. @override
Stream<Map<String, dynamic>> sendStreamingChatRequest(
  1. List<Map<String, dynamic>> messages,
  2. List<Map<String, dynamic>>? tools, {
  3. double? temperature,
  4. int? maxCompletionTokens,
  5. double? topP,
  6. String? reasoningEffort,
  7. dynamic stop,
  8. CancellationToken? cancellationToken,
})
override

Sends a streaming chat request using Anthropic's SSE protocol.

Implementation

@override
Stream<Map<String, dynamic>> sendStreamingChatRequest(
  List<Map<String, dynamic>> messages,
  List<Map<String, dynamic>>? tools, {
  double? temperature,
  int? maxCompletionTokens,
  double? topP,
  String? reasoningEffort,
  dynamic stop,
  CancellationToken? cancellationToken,
}) async* {
  sdkLogger.info(
    'Opening Anthropic stream (model: $model)',
    tag: 'API',
    extra: {'model': model, 'message_count': messages.length},
  );

  if (cancellationToken?.isCancelled == true) {
    throw VanturaCancellationException();
  }

  final anthropicData = _convertToAnthropicFormat(messages, tools);

  final body = jsonEncode({
    'model': model,
    'max_tokens': maxCompletionTokens ?? maxTokens,
    'stream': true,
    if (anthropicData['system'] != null) 'system': anthropicData['system'],
    'messages': anthropicData['messages'],
    if (anthropicData['tools'] != null) 'tools': anthropicData['tools'],
    if ((temperature ?? this.temperature) != null)
      'temperature': temperature ?? this.temperature,
  });

  final request = http.Request('POST', Uri.parse(baseUrl));
  request.headers.addAll({..._headers, 'accept': 'text/event-stream'});
  request.body = body;

  final streamed = await _httpClient.send(request);

  if (streamed.statusCode != 200) {
    final errorBody = await streamed.stream.bytesToString();
    throw VanturaApiException(
      'Anthropic streaming error',
      statusCode: streamed.statusCode,
      responseBody: errorBody,
    );
  }

  // --- SSE state machine ---
  final StringBuffer textBuffer = StringBuffer();
  // Map from content block index → {id, name, inputBuffer}
  final Map<int, Map<String, dynamic>> toolBlocks = {};
  String? stopReason;
  int inputTokens = 0;
  int outputTokens = 0;
  String? currentEventType;

  await for (final line
      in streamed.stream
          .takeWhile((_) => !(cancellationToken?.isCancelled ?? false))
          .transform(utf8.decoder)
          .transform(const LineSplitter())) {
    if (cancellationToken?.isCancelled == true) break;

    final trimmed = line.trim();
    if (trimmed.isEmpty) continue;

    // Capture the event-type line (comes before the data line in Anthropic SSE)
    if (trimmed.startsWith('event: ')) {
      currentEventType = trimmed.substring(7).trim();
      continue;
    }

    if (!trimmed.startsWith('data: ')) continue;
    final rawData = trimmed.substring(6).trim();
    if (rawData == '[DONE]') break;

    Map<String, dynamic> event;
    try {
      event = jsonDecode(rawData) as Map<String, dynamic>;
    } catch (_) {
      continue; // skip malformed lines
    }

    // Prefer the explicit event: line; fall back to the 'type' field in data
    final eventType = currentEventType ?? event['type'] as String?;
    currentEventType = null; // reset after consuming

    switch (eventType) {
      case 'message_start':
        // Capture input token count from the opening message event
        final msg = event['message'] as Map<String, dynamic>?;
        final usage = msg?['usage'] as Map<String, dynamic>?;
        inputTokens = (usage?['input_tokens'] as int?) ?? 0;

      case 'content_block_start':
        // Register a new content block — only care about tool_use blocks
        final index = (event['index'] as int?) ?? 0;
        final block = event['content_block'] as Map<String, dynamic>?;
        if (block?['type'] == 'tool_use') {
          toolBlocks[index] = {
            'id': block!['id'],
            'name': block['name'],
            'inputBuffer': StringBuffer(),
          };
        }

      case 'content_block_delta':
        final index = (event['index'] as int?) ?? 0;
        final delta = event['delta'] as Map<String, dynamic>?;
        if (delta == null) break;

        if (delta['type'] == 'text_delta') {
          // Incremental text token — yield immediately and accumulate
          final chunk = (delta['text'] as String?) ?? '';
          textBuffer.write(chunk);
          yield {
            'choices': [
              {
                'delta': {'role': 'assistant', 'content': chunk},
                'finish_reason': null,
              },
            ],
          };
        } else if (delta['type'] == 'input_json_delta') {
          // Partial JSON for tool arguments — accumulate only, don't yield
          final partial = (delta['partial_json'] as String?) ?? '';
          (toolBlocks[index]?['inputBuffer'] as StringBuffer?)?.write(
            partial,
          );
        }

      case 'message_delta':
        // Capture stop_reason and final output token count
        final delta = event['delta'] as Map<String, dynamic>?;
        stopReason = delta?['stop_reason'] as String?;
        final usage = event['usage'] as Map<String, dynamic>?;
        outputTokens = (usage?['output_tokens'] as int?) ?? outputTokens;

      case 'message_stop':
        break;

      default:
        break;
    }
  }

  // --- Emit the final aggregate chunk ---
  final finalText = textBuffer.toString();

  List<Map<String, dynamic>>? toolCalls;
  if (toolBlocks.isNotEmpty) {
    toolCalls = toolBlocks.entries.map((e) {
      final block = e.value;
      final inputStr = (block['inputBuffer'] as StringBuffer).toString();
      dynamic parsedInput;
      try {
        parsedInput = jsonDecode(inputStr.isEmpty ? '{}' : inputStr);
      } catch (_) {
        parsedInput = <String, dynamic>{};
      }
      return {
        'id': block['id'],
        'type': 'function',
        'function': {
          'name': block['name'],
          'arguments': jsonEncode(parsedInput),
        },
      };
    }).toList();
  }

  yield {
    'choices': [
      {
        'delta': {
          'role': 'assistant',
          'content': finalText.isEmpty ? null : finalText,
          if (toolCalls != null) 'tool_calls': toolCalls,
        },
        'finish_reason': stopReason == 'tool_use' ? 'tool_calls' : stopReason,
      },
    ],
    'usage': {
      'prompt_tokens': inputTokens,
      'completion_tokens': outputTokens,
      'total_tokens': inputTokens + outputTokens,
    },
  };
}