manageSSEData method

void manageSSEData(
  1. Map<String, dynamic> data,
  2. bool isExternalSearch
)

Implementation

void manageSSEData(Map<String, dynamic> data, bool isExternalSearch) {
  PupauMessage newMessage = PupauMessage.fromSseStream(data);
  resetLoadingMessage();
  messages
          .firstWhereOrNull((message) => message.status == MessageStatus.sent)
          ?.id =
      newMessage.id;
  messages.refresh();
  update();

  // Track first token received (not heartbeat or empty messages)
  if (!_hasReceivedFirstToken && _currentMessageStartTime != null) {
    _hasReceivedFirstToken = true;
    final timeToFirstToken = DateTime.now()
        .difference(_currentMessageStartTime!)
        .inMilliseconds;
    PupauEventService.instance.emitPupauEvent(
      PupauEvent(
        type: UpdateConversationType.timeToFirstToken,
        payload: {
          "assistantId": assistantId,
          "assistantType": assistant.value?.type ?? AssistantType.assistant,
          "timeToFirstToken": timeToFirstToken,
        },
      ),
    );
  }

  // Calculate tokens per second when contextInfo is received
  if (newMessage.contextInfo != null && _currentMessageStartTime != null) {
    final totalSeconds = DateTime.now()
        .difference(_currentMessageStartTime!)
        .inSeconds;
    if (totalSeconds > 0 && newMessage.contextInfo!.outputTokens > 0) {
      final tokensPerSecond =
          newMessage.contextInfo!.outputTokens / totalSeconds;
      PupauEventService.instance.emitPupauEvent(
        PupauEvent(
          type: UpdateConversationType.tokensPerSecond,
          payload: {
            "assistantId": assistantId,
            "assistantType": assistant.value?.type ?? AssistantType.assistant,
            "tokensPerSecond": tokensPerSecond,
          },
        ),
      );
    }
  }
  if (newMessage.sourceType == SourceType.toolUse) {
    handleToolUseMessage(data);
    return;
  }
  if (newMessage.sourceType == SourceType.uiTool) {
    handleUiToolMessage(data);
    return;
  }
  if (newMessage.type == MessageType.kb) {
    handleKbMessage(newMessage);
    return;
  }
  if (newMessage.type == MessageType.conversationTitleGenerated) {
    updateConversationTitle(title: newMessage.title);
    return;
  }
  if (newMessage.type == MessageType.layerMessage) {
    handleLayerMessage(newMessage);
    return;
  }
  if (newMessage.type == MessageType.toolUseStart) {
    handleToolUseStartMessage(newMessage);
    return;
  }
  if (newMessage.type == MessageType.webSearchQuery) {
    handleWebSearchQueryMessage(newMessage);
    return;
  }
  if (newMessage.type == MessageType.layerResponse ||
      newMessage.type == MessageType.webSearch ||
      newMessage.type == MessageType.retry) {
    return;
  }
  bool messageIsEmpty = newMessage.answer == "";
  PupauMessage updatedMessage = updateSSEMessages(newMessage);
  if (!messageIsEmpty) {
    messageNotifier.addData(updatedMessage.answer, updatedMessage.id);
    manageChatAutoScroll();
  }
  MessageService.checkSSEErrors(newMessage);
  if (MessageService.canEnableExternalSearch(newMessage, isExternalSearch)) {
    setExternalSearchButton(true);
  }
  if (newMessage.isLast == true) {
    assistantsReplying.value--;
    manageSendSuccess(isExternalSearch);
  }
}