createMessageStream method

  1. @override
Stream<StreamEvent> createMessageStream({
  1. required List<Message> messages,
  2. required String systemPrompt,
  3. List<ToolDefinition> tools = const [],
  4. int? maxTokens,
})
override

Stream a chat completion, translating OpenAI SSE events to Anthropic format.

Implementation

@override
Stream<StreamEvent> createMessageStream({
  required List<Message> messages,
  required String systemPrompt,
  List<ToolDefinition> tools = const [],
  int? maxTokens,
}) async* {
  final openAiMessages = _expandMessages(messages, systemPrompt);

  final body = <String, dynamic>{
    'model': config.model,
    'messages': openAiMessages,
    'stream': true,
    'max_tokens': maxTokens ?? config.maxTokens,
  };

  if (tools.isNotEmpty) {
    body['tools'] = tools.map((t) => t.toOpenAiMap()).toList();
  }

  final url = '${config.baseUrl}/chat/completions';
  final maskedKey = config.apiKey != null
      ? '${config.apiKey!.substring(0, 4)}...${config.apiKey!.substring(config.apiKey!.length - 4)}'
      : 'none';
  SintSentinel.logger.d('OpenAiShim.createMessageStream: POST $url (key: $maskedKey)');

  final request = http.Request('POST', Uri.parse(url));
  request.headers.addAll(_headers);
  request.body = jsonEncode(body);

  final client = http.Client();
  final response = await SintSentinel.guard(
    () => client.send(request),
    tag: 'OpenAiShim.createMessageStream',
  );

  SintSentinel.logger.d('OpenAiShim response status: ${response.statusCode}');

  if (response.statusCode != 200) {
    final errorBody = await response.stream.bytesToString();
    SintSentinel.logger.e('OpenAiShim API error ${response.statusCode}: $errorBody');
    yield ErrorEvent(
      message: 'OpenAI API error ${response.statusCode}: $errorBody',
      type: 'api_error',
    );
    return;
  }

  // Emit synthetic Anthropic-format events from OpenAI stream
  final messageId = 'msg_${_uuid.v4()}';
  yield MessageStartEvent(messageId: messageId, model: config.model);

  var currentIndex = 0;
  var hasStartedText = false;
  final toolCallBuffers = <int, _ToolCallBuffer>{};

  await for (final event in _parseOpenAiSSE(response.stream)) {
    final choices = event['choices'] as List?;
    if (choices == null || choices.isEmpty) continue;

    final delta = choices[0]['delta'] as Map<String, dynamic>?;
    if (delta == null) continue;

    // Handle text content
    final content = delta['content'] as String?;
    if (content != null && content.isNotEmpty) {
      if (!hasStartedText) {
        yield ContentBlockStartEvent(
          index: currentIndex,
          block: const TextBlock(''),
        );
        hasStartedText = true;
      }
      yield ContentBlockDeltaEvent(index: currentIndex, text: content);
    }

    // Handle tool calls
    final toolCalls = delta['tool_calls'] as List?;
    if (toolCalls != null) {
      for (final tc in toolCalls) {
        final tcMap = tc as Map<String, dynamic>;
        final tcIndex = tcMap['index'] as int;

        if (!toolCallBuffers.containsKey(tcIndex)) {
          // Close text block if open
          if (hasStartedText) {
            yield ContentBlockStopEvent(index: currentIndex);
            currentIndex++;
            hasStartedText = false;
          }

          toolCallBuffers[tcIndex] = _ToolCallBuffer(
            id: tcMap['id'] as String? ?? 'call_${_uuid.v4()}',
            name:
                (tcMap['function'] as Map<String, dynamic>?)?['name']
                    as String? ??
                '',
            argumentsBuffer: StringBuffer(),
          );

          yield ContentBlockStartEvent(
            index: currentIndex + tcIndex,
            block: ToolUseBlock(
              id: toolCallBuffers[tcIndex]!.id,
              name: toolCallBuffers[tcIndex]!.name,
              input: {},
            ),
          );
        }

        final args =
            (tcMap['function'] as Map<String, dynamic>?)?['arguments']
                as String?;
        if (args != null) {
          toolCallBuffers[tcIndex]!.argumentsBuffer.write(args);
          yield ContentBlockDeltaEvent(
            index: currentIndex + tcIndex,
            text: args,
          );
        }
      }
    }

    // Handle finish
    final finishReason = choices[0]['finish_reason'] as String?;
    if (finishReason != null) {
      if (hasStartedText) {
        yield ContentBlockStopEvent(index: currentIndex);
      }
      for (final tcIndex in toolCallBuffers.keys) {
        yield ContentBlockStopEvent(index: currentIndex + tcIndex);
      }

      final stopReason = switch (finishReason) {
        'stop' => StopReason.endTurn,
        'length' => StopReason.maxTokens,
        'tool_calls' => StopReason.toolUse,
        _ => StopReason.endTurn,
      };

      final usage = event['usage'] as Map<String, dynamic>?;
      yield MessageDeltaEvent(
        stopReason: stopReason,
        usage: usage != null
            ? TokenUsage(
                inputTokens: usage['prompt_tokens'] as int? ?? 0,
                outputTokens: usage['completion_tokens'] as int? ?? 0,
              )
            : null,
      );
      yield const MessageStopEvent();
    }
  }
}