manageSSEData method
void
manageSSEData(
- Map<String, dynamic> data,
- bool isExternalSearch
)
Implementation
void manageSSEData(Map<String, dynamic> data, bool isExternalSearch) {
PupauMessage newMessage = PupauMessage.fromSseStream(data);
if (newMessage.type == MessageType.heartbeat ||
newMessage.type == MessageType.toolHeartbeat ||
(newMessage.sourceType == SourceType.event &&
newMessage.type == null)) {
return;
}
if (activeToolLoadings.isEmpty) resetLoadingMessage();
messages
.firstWhereOrNull((message) => message.status == MessageStatus.sent)
?.id =
newMessage.id;
messages.refresh();
update();
// Track first token received (not heartbeat or empty messages)
if (!_hasReceivedFirstToken && _currentMessageStartTime != null) {
_hasReceivedFirstToken = true;
final int timeToFirstToken = DateTime.now()
.difference(_currentMessageStartTime!)
.inMilliseconds;
PupauEventService.instance.emitPupauEvent(
PupauEvent(
type: UpdateConversationType.timeToFirstToken,
payload: {
"assistantId": assistantId,
"assistantType": assistant.value?.type ?? AssistantType.assistant,
"timeToFirstToken": timeToFirstToken,
},
),
);
}
// Calculate tokens per second when contextInfo is received
if (newMessage.contextInfo != null && _currentMessageStartTime != null) {
final int totalSeconds = DateTime.now()
.difference(_currentMessageStartTime!)
.inSeconds;
if (totalSeconds > 0 && newMessage.contextInfo!.outputTokens > 0) {
final double tokensPerSecond =
newMessage.contextInfo!.outputTokens / totalSeconds;
PupauEventService.instance.emitPupauEvent(
PupauEvent(
type: UpdateConversationType.tokensPerSecond,
payload: {
"assistantId": assistantId,
"assistantType": assistant.value?.type ?? AssistantType.assistant,
"tokensPerSecond": tokensPerSecond,
},
),
);
}
}
if (newMessage.sourceType == SourceType.toolUse) {
handleToolUseCompletionByMessage(newMessage);
handleToolUseMessage(data);
return;
}
if (newMessage.sourceType == SourceType.uiTool) {
handleUiToolMessage(data);
return;
}
if (newMessage.type == MessageType.attachmentTrimming) {
handleAttachmentTrimmingEvent(data);
return;
}
if (newMessage.type == MessageType.kb) {
handleKbMessage(newMessage);
return;
}
if (newMessage.type == MessageType.conversationTitleGenerated) {
updateConversationTitle(newMessage.title ?? "");
return;
}
if (newMessage.type == MessageType.layerMessage) {
handleLayerMessage(newMessage);
return;
}
if (newMessage.type == MessageType.toolUseStart ||
newMessage.type == MessageType.toolPending) {
handleToolUseStartPendingMessage(newMessage, data);
return;
}
if (newMessage.type == MessageType.toolArgsDelta) {
handleToolArgsDeltaEvent(data);
return;
}
if (newMessage.type == MessageType.toolHeartbeat) {
handleToolHeartbeatEvent(data);
return;
}
if (newMessage.type == MessageType.toolPartialResult) {
handleToolPartialResultEvent(data);
return;
}
if (newMessage.type == MessageType.webSearchQuery) {
handleWebSearchQueryMessage(newMessage);
return;
}
if (newMessage.type == MessageType.audioInputTranscription) {
handleAudioInputTranscription(newMessage);
return;
}
if (newMessage.type == MessageType.toolEvaluation) {
handleToolEvaluationMessage();
return;
}
if (newMessage.type == MessageType.layerResponse ||
newMessage.type == MessageType.webSearch ||
newMessage.type == MessageType.retry) {
return;
}
bool messageIsEmpty = newMessage.answer == "";
PupauMessage updatedMessage = updateSSEMessages(newMessage);
if (!messageIsEmpty) {
messageNotifier.addData(updatedMessage.answer, updatedMessage.id);
manageChatAutoScroll();
}
MessageService.checkSSEErrors(newMessage);
if (MessageService.canEnableExternalSearch(newMessage, isExternalSearch)) {
setExternalSearchButton(true);
}
if (newMessage.isLast == true) {
assistantsReplying.value--;
manageSendSuccess(isExternalSearch);
}
}