createMessageStream method
Stream<StreamEvent>
createMessageStream({
- required List<
Message> messages, - required String systemPrompt,
- List<
ToolDefinition> tools = const [], - int? maxTokens,
override
Stream a chat completion, translating OpenAI SSE events to Anthropic format.
Implementation
@override
Stream<StreamEvent> createMessageStream({
required List<Message> messages,
required String systemPrompt,
List<ToolDefinition> tools = const [],
int? maxTokens,
}) async* {
final openAiMessages = _expandMessages(messages, systemPrompt);
final body = <String, dynamic>{
'model': config.model,
'messages': openAiMessages,
'stream': true,
'max_tokens': maxTokens ?? config.maxTokens,
};
if (tools.isNotEmpty) {
body['tools'] = tools.map((t) => t.toOpenAiMap()).toList();
}
final url = '${config.baseUrl}/chat/completions';
final maskedKey = config.apiKey != null
? '${config.apiKey!.substring(0, 4)}...${config.apiKey!.substring(config.apiKey!.length - 4)}'
: 'none';
SintSentinel.logger.d('OpenAiShim.createMessageStream: POST $url (key: $maskedKey)');
final request = http.Request('POST', Uri.parse(url));
request.headers.addAll(_headers);
request.body = jsonEncode(body);
final client = http.Client();
final response = await SintSentinel.guard(
() => client.send(request),
tag: 'OpenAiShim.createMessageStream',
);
SintSentinel.logger.d('OpenAiShim response status: ${response.statusCode}');
if (response.statusCode != 200) {
final errorBody = await response.stream.bytesToString();
SintSentinel.logger.e('OpenAiShim API error ${response.statusCode}: $errorBody');
yield ErrorEvent(
message: 'OpenAI API error ${response.statusCode}: $errorBody',
type: 'api_error',
);
return;
}
// Emit synthetic Anthropic-format events from OpenAI stream
final messageId = 'msg_${_uuid.v4()}';
yield MessageStartEvent(messageId: messageId, model: config.model);
var currentIndex = 0;
var hasStartedText = false;
final toolCallBuffers = <int, _ToolCallBuffer>{};
await for (final event in _parseOpenAiSSE(response.stream)) {
final choices = event['choices'] as List?;
if (choices == null || choices.isEmpty) continue;
final delta = choices[0]['delta'] as Map<String, dynamic>?;
if (delta == null) continue;
// Handle text content
final content = delta['content'] as String?;
if (content != null && content.isNotEmpty) {
if (!hasStartedText) {
yield ContentBlockStartEvent(
index: currentIndex,
block: const TextBlock(''),
);
hasStartedText = true;
}
yield ContentBlockDeltaEvent(index: currentIndex, text: content);
}
// Handle tool calls
final toolCalls = delta['tool_calls'] as List?;
if (toolCalls != null) {
for (final tc in toolCalls) {
final tcMap = tc as Map<String, dynamic>;
final tcIndex = tcMap['index'] as int;
if (!toolCallBuffers.containsKey(tcIndex)) {
// Close text block if open
if (hasStartedText) {
yield ContentBlockStopEvent(index: currentIndex);
currentIndex++;
hasStartedText = false;
}
toolCallBuffers[tcIndex] = _ToolCallBuffer(
id: tcMap['id'] as String? ?? 'call_${_uuid.v4()}',
name:
(tcMap['function'] as Map<String, dynamic>?)?['name']
as String? ??
'',
argumentsBuffer: StringBuffer(),
);
yield ContentBlockStartEvent(
index: currentIndex + tcIndex,
block: ToolUseBlock(
id: toolCallBuffers[tcIndex]!.id,
name: toolCallBuffers[tcIndex]!.name,
input: {},
),
);
}
final args =
(tcMap['function'] as Map<String, dynamic>?)?['arguments']
as String?;
if (args != null) {
toolCallBuffers[tcIndex]!.argumentsBuffer.write(args);
yield ContentBlockDeltaEvent(
index: currentIndex + tcIndex,
text: args,
);
}
}
}
// Handle finish
final finishReason = choices[0]['finish_reason'] as String?;
if (finishReason != null) {
if (hasStartedText) {
yield ContentBlockStopEvent(index: currentIndex);
}
for (final tcIndex in toolCallBuffers.keys) {
yield ContentBlockStopEvent(index: currentIndex + tcIndex);
}
final stopReason = switch (finishReason) {
'stop' => StopReason.endTurn,
'length' => StopReason.maxTokens,
'tool_calls' => StopReason.toolUse,
_ => StopReason.endTurn,
};
final usage = event['usage'] as Map<String, dynamic>?;
yield MessageDeltaEvent(
stopReason: stopReason,
usage: usage != null
? TokenUsage(
inputTokens: usage['prompt_tokens'] as int? ?? 0,
outputTokens: usage['completion_tokens'] as int? ?? 0,
)
: null,
);
yield const MessageStopEvent();
}
}
}