sendStreamingChatRequest method
Stream<Map<String, dynamic> >
sendStreamingChatRequest(
- List<
Map< messages,String, dynamic> > - List<
Map< ? tools, {String, dynamic> > - double? temperature,
- int? maxCompletionTokens,
- double? topP,
- String? reasoningEffort,
- dynamic stop,
- CancellationToken? cancellationToken,
override
Sends a streaming chat request using Anthropic's SSE protocol.
Implementation
@override
Stream<Map<String, dynamic>> sendStreamingChatRequest(
List<Map<String, dynamic>> messages,
List<Map<String, dynamic>>? tools, {
double? temperature,
int? maxCompletionTokens,
double? topP,
String? reasoningEffort,
dynamic stop,
CancellationToken? cancellationToken,
}) async* {
sdkLogger.info(
'Opening Anthropic stream (model: $model)',
tag: 'API',
extra: {'model': model, 'message_count': messages.length},
);
if (cancellationToken?.isCancelled == true) {
throw VanturaCancellationException();
}
final anthropicData = _convertToAnthropicFormat(messages, tools);
final body = jsonEncode({
'model': model,
'max_tokens': maxCompletionTokens ?? maxTokens,
'stream': true,
if (anthropicData['system'] != null) 'system': anthropicData['system'],
'messages': anthropicData['messages'],
if (anthropicData['tools'] != null) 'tools': anthropicData['tools'],
if ((temperature ?? this.temperature) != null)
'temperature': temperature ?? this.temperature,
});
final request = http.Request('POST', Uri.parse(baseUrl));
request.headers.addAll({..._headers, 'accept': 'text/event-stream'});
request.body = body;
final streamed = await _httpClient.send(request);
if (streamed.statusCode != 200) {
final errorBody = await streamed.stream.bytesToString();
throw VanturaApiException(
'Anthropic streaming error',
statusCode: streamed.statusCode,
responseBody: errorBody,
);
}
// --- SSE state machine ---
final StringBuffer textBuffer = StringBuffer();
// Map from content block index → {id, name, inputBuffer}
final Map<int, Map<String, dynamic>> toolBlocks = {};
String? stopReason;
int inputTokens = 0;
int outputTokens = 0;
String? currentEventType;
await for (final line
in streamed.stream
.takeWhile((_) => !(cancellationToken?.isCancelled ?? false))
.transform(utf8.decoder)
.transform(const LineSplitter())) {
if (cancellationToken?.isCancelled == true) break;
final trimmed = line.trim();
if (trimmed.isEmpty) continue;
// Capture the event-type line (comes before the data line in Anthropic SSE)
if (trimmed.startsWith('event: ')) {
currentEventType = trimmed.substring(7).trim();
continue;
}
if (!trimmed.startsWith('data: ')) continue;
final rawData = trimmed.substring(6).trim();
if (rawData == '[DONE]') break;
Map<String, dynamic> event;
try {
event = jsonDecode(rawData) as Map<String, dynamic>;
} catch (_) {
continue; // skip malformed lines
}
// Prefer the explicit event: line; fall back to the 'type' field in data
final eventType = currentEventType ?? event['type'] as String?;
currentEventType = null; // reset after consuming
switch (eventType) {
case 'message_start':
// Capture input token count from the opening message event
final msg = event['message'] as Map<String, dynamic>?;
final usage = msg?['usage'] as Map<String, dynamic>?;
inputTokens = (usage?['input_tokens'] as int?) ?? 0;
case 'content_block_start':
// Register a new content block — only care about tool_use blocks
final index = (event['index'] as int?) ?? 0;
final block = event['content_block'] as Map<String, dynamic>?;
if (block?['type'] == 'tool_use') {
toolBlocks[index] = {
'id': block!['id'],
'name': block['name'],
'inputBuffer': StringBuffer(),
};
}
case 'content_block_delta':
final index = (event['index'] as int?) ?? 0;
final delta = event['delta'] as Map<String, dynamic>?;
if (delta == null) break;
if (delta['type'] == 'text_delta') {
// Incremental text token — yield immediately and accumulate
final chunk = (delta['text'] as String?) ?? '';
textBuffer.write(chunk);
yield {
'choices': [
{
'delta': {'role': 'assistant', 'content': chunk},
'finish_reason': null,
},
],
};
} else if (delta['type'] == 'input_json_delta') {
// Partial JSON for tool arguments — accumulate only, don't yield
final partial = (delta['partial_json'] as String?) ?? '';
(toolBlocks[index]?['inputBuffer'] as StringBuffer?)?.write(
partial,
);
}
case 'message_delta':
// Capture stop_reason and final output token count
final delta = event['delta'] as Map<String, dynamic>?;
stopReason = delta?['stop_reason'] as String?;
final usage = event['usage'] as Map<String, dynamic>?;
outputTokens = (usage?['output_tokens'] as int?) ?? outputTokens;
case 'message_stop':
break;
default:
break;
}
}
// --- Emit the final aggregate chunk ---
final finalText = textBuffer.toString();
List<Map<String, dynamic>>? toolCalls;
if (toolBlocks.isNotEmpty) {
toolCalls = toolBlocks.entries.map((e) {
final block = e.value;
final inputStr = (block['inputBuffer'] as StringBuffer).toString();
dynamic parsedInput;
try {
parsedInput = jsonDecode(inputStr.isEmpty ? '{}' : inputStr);
} catch (_) {
parsedInput = <String, dynamic>{};
}
return {
'id': block['id'],
'type': 'function',
'function': {
'name': block['name'],
'arguments': jsonEncode(parsedInput),
},
};
}).toList();
}
yield {
'choices': [
{
'delta': {
'role': 'assistant',
'content': finalText.isEmpty ? null : finalText,
if (toolCalls != null) 'tool_calls': toolCalls,
},
'finish_reason': stopReason == 'tool_use' ? 'tool_calls' : stopReason,
},
],
'usage': {
'prompt_tokens': inputTokens,
'completion_tokens': outputTokens,
'total_tokens': inputTokens + outputTokens,
},
};
}