convertProviderStream function
Convert the existing StreamEvent (from api_provider.dart) sequence into the higher-level StreamUpdate sequence used by this module.
This enables interop: callers using ApiProvider.createMessageStream can pipe its events through this converter to get StreamUpdates.
Implementation
Stream<StreamUpdate> convertProviderStream(
Stream<StreamEvent> providerStream,
) async* {
final assembler = StreamAssembler();
await for (final event in providerStream) {
switch (event) {
case MessageStartEvent(:final messageId, :final model):
assembler.processEvent('message_start', {
'message': {'id': messageId, 'model': model},
});
yield MessageStartUpdate(messageId: messageId, model: model);
case ContentBlockStartEvent(:final index, :final block):
final blockMap = _contentBlockToMap(block);
assembler.processEvent('content_block_start', {
'index': index,
'content_block': blockMap,
});
// Yield appropriate start update.
if (block is ToolUseBlock) {
yield ToolUseStart(
toolName: block.name,
toolId: block.id,
blockIndex: index,
);
}
case ContentBlockDeltaEvent(:final index, :final text):
assembler.processEvent('content_block_delta', {
'index': index,
'delta': {'type': 'text_delta', 'text': text},
});
yield TextDelta(text: text, blockIndex: index);
case ContentBlockStopEvent(:final index):
assembler.processEvent('content_block_stop', {'index': index});
case MessageDeltaEvent(:final stopReason, :final usage):
final data = <String, dynamic>{
'delta': {if (stopReason != null) 'stop_reason': stopReason.name},
};
if (usage != null) {
data['usage'] = {'output_tokens': usage.outputTokens};
yield UsageUpdate(
inputTokens: usage.inputTokens,
outputTokens: usage.outputTokens,
cacheCreationInputTokens: usage.cacheCreationInputTokens,
cacheReadInputTokens: usage.cacheReadInputTokens,
);
}
assembler.processEvent('message_delta', data);
case MessageStopEvent():
assembler.processEvent('message_stop', {});
yield MessageComplete(
stopReason: assembler.state.stopReason,
messageId: assembler.state.messageId,
model: assembler.state.model,
);
case ErrorEvent(:final message, :final type):
yield StreamError(message: message, errorType: type);
}
}
}