convertProviderStream function

Stream<StreamUpdate> convertProviderStream(
  1. Stream<StreamEvent> providerStream
)

Convert the existing StreamEvent (from api_provider.dart) sequence into the higher-level StreamUpdate sequence used by this module.

This enables interop: callers using ApiProvider.createMessageStream can pipe its events through this converter to get StreamUpdates.

Implementation

Stream<StreamUpdate> convertProviderStream(
  Stream<StreamEvent> providerStream,
) async* {
  final assembler = StreamAssembler();

  await for (final event in providerStream) {
    switch (event) {
      case MessageStartEvent(:final messageId, :final model):
        assembler.processEvent('message_start', {
          'message': {'id': messageId, 'model': model},
        });
        yield MessageStartUpdate(messageId: messageId, model: model);

      case ContentBlockStartEvent(:final index, :final block):
        final blockMap = _contentBlockToMap(block);
        assembler.processEvent('content_block_start', {
          'index': index,
          'content_block': blockMap,
        });
        // Yield appropriate start update.
        if (block is ToolUseBlock) {
          yield ToolUseStart(
            toolName: block.name,
            toolId: block.id,
            blockIndex: index,
          );
        }

      case ContentBlockDeltaEvent(:final index, :final text):
        assembler.processEvent('content_block_delta', {
          'index': index,
          'delta': {'type': 'text_delta', 'text': text},
        });
        yield TextDelta(text: text, blockIndex: index);

      case ContentBlockStopEvent(:final index):
        assembler.processEvent('content_block_stop', {'index': index});

      case MessageDeltaEvent(:final stopReason, :final usage):
        final data = <String, dynamic>{
          'delta': {if (stopReason != null) 'stop_reason': stopReason.name},
        };
        if (usage != null) {
          data['usage'] = {'output_tokens': usage.outputTokens};
          yield UsageUpdate(
            inputTokens: usage.inputTokens,
            outputTokens: usage.outputTokens,
            cacheCreationInputTokens: usage.cacheCreationInputTokens,
            cacheReadInputTokens: usage.cacheReadInputTokens,
          );
        }
        assembler.processEvent('message_delta', data);

      case MessageStopEvent():
        assembler.processEvent('message_stop', {});
        yield MessageComplete(
          stopReason: assembler.state.stopReason,
          messageId: assembler.state.messageId,
          model: assembler.state.model,
        );

      case ErrorEvent(:final message, :final type):
        yield StreamError(message: message, errorType: type);
    }
  }
}