loadConversation method

Future<void> loadConversation(
  1. String conversationId
)

Implementation

Future<void> loadConversation(String conversationId) async {
  try {
    resetConversation();
    assistantsReplying.value = 0;
    isStreaming.value = false;
    incomingMessages.clear();
    conversationSseSubscription?.cancel();
    conversationSseSubscription = null;
    messageNotifier = MessageNotifier();
    isLoadingConversation.value = true;
    loadingMessage.value = LoadingMessage(
      message: "",
      loadingType: LoadingType.dots,
    );
    update();
    conversation.value = await ConversationService.getConversation(
      assistantId,
      conversationId,
      isMarketplace,
    );
    if (conversation.value == null) {
      isLoadingConversation.value = false;
      isConversationHistoryLoaded = false;
      update();
      return;
    }
    Get.find<PupauAttachmentsController>().loadAttachments();

    // Initialize notifier for possible catch-up/live updates.
    messageNotifier.setAssistantId(assistantId);
    messageNotifier.setConversationId(conversation.value!.id);

    // Try async SSE history + catch-up (feature gated on server).
    final String? lastEventId = PupauSharedPreferences.getLastEventId(
      conversation.value?.id ?? "",
    );
    final Stream<SSEModel>? sseStream =
        await SSEService.createConversationSseGetStream(
          assistantId,
          conversation.value!.id,
          conversation.value!.token,
          lastEventId: lastEventId,
          chatController: this,
        );

    if (sseStream == null) {
      // Fallback to previous REST pagination behavior.
      // Initialize pagination so that the newest messages are loaded first.
      _initializeConversationPagination();
      await loadConversationMessages(reset: true);
      if (conversation.value != null && messages.length > 1) {
        _isFirstMessage = false;
      }
      isLoadingConversation.value = false;
      isConversationHistoryLoaded = true;
      update();
      scrollToBottomChat();
      PupauEventService.instance.emitPupauEvent(
        PupauEvent(
          type: UpdateConversationType.conversationChanged,
          payload: {
            "assistantId": assistantId,
            "assistantType": assistant.value?.type ?? AssistantType.assistant,
            "conversation": conversation.value!,
          },
        ),
      );
      return;
    }

    // SSE flow: first event is always `history`, then catch-up/live events.
    isConversationHistoryLoaded = false;
    isConversationLastPage = true;
    conversationPage = 0;
    conversationItemsLoaded = 0;
    resetLoadingMessage();
    update();

    bool historyLoaded = false;
    final Completer<void> historyCompleter = Completer<void>();

    conversationSseSubscription = sseStream.listen(
      (SSEModel sseEvent) async {
        _bumpSseIdleTimer();
        final String? data = sseEvent.data;
        setLastEventId(sseEvent);
        if (sseEvent.event == 'history') {
          if (historyLoaded) return;
          historyLoaded = true;

          messages.clear();
          incomingMessages.clear();

          try {
            final dynamic decoded = jsonDecode(data ?? '[]');
            final List<dynamic> items = decoded is List
                ? decoded
                : <dynamic>[];

            for (final dynamic raw in items) {
              if (raw is! Map<String, dynamic>) continue;
              final Map<String, dynamic> item = Map<String, dynamic>.from(
                raw,
              );

              // Use the exact same message shaping as REST pagination,
              // so markdown/thinking/tool-use elements render identically.
              final PupauMessage loadedMessage = PupauMessage.fromLoadedChat(
                item,
              );
              final PupauMessage userMessage =
                  MessageService.getUserLoadedMessage(loadedMessage);
              final PupauMessage assistantMessage =
                  MessageService.getAssistantLoadedMessage(loadedMessage);

              if (isFirstMessageInGroup(loadedMessage.groupId)) {
                messages.insert(0, userMessage);
              }
              messages.insert(0, assistantMessage);
              incomingMessages.add(assistantMessage);
            }
          } catch (_) {
            // If parsing fails, keep whatever was loaded by the resetConversation.
          }

          messages.refresh();
          update();
          isConversationHistoryLoaded = true;
          isConversationLastPage = true;

          // Only enter streaming mode immediately when history is not empty.
          // If history is empty and no further events arrive, keep non-streaming.
          if (messages.isNotEmpty) {
            assistantsReplying.value = 1;
            isStreaming.value = true;
            _bumpSseIdleTimer();
          } else {
            assistantsReplying.value = 0;
            isStreaming.value = false;
          }
          update();

          if (!historyCompleter.isCompleted) {
            historyCompleter.complete();
          }
          return;
        }

        if (!historyLoaded) return;
        if (data == null || data.trim().isEmpty) return;

        try {
          final Map<String, dynamic> decoded =
              jsonDecode(data) as Map<String, dynamic>;

          // Async SSE reconnection payloads are wrapped:
          // { eventType: "...", payload: {...}, ... }
          final String eventType = (decoded['eventType']?.toString() ?? '')
              .trim();

          // Terminal async events: stop streaming UI.
          const List<String> terminalTypes = <String>[
            'run_completed',
            'run_stopped',
            'run_error',
          ];
          if (terminalTypes.contains(eventType)) {
            assistantsReplying.value = 0;
            isStreaming.value = false;
            resetLoadingMessage();
            update();
            return;
          }

          // Any non-terminal event after history means the run is active.
          // Mirror the regular sendMessage() behavior (assistantsReplying=1).
          if (!isStreaming.value) {
            assistantsReplying.value = 1;
            isStreaming.value = true;
            update();
          }

          if (eventType == 'message') {
            final Map<String, dynamic> payload = decoded['payload'];
            manageSSEData(payload, false);
            return;
          }
        } catch (_) {}
      },
      onError: (e) {
        _cancelSseIdleTimer();
        if (!historyCompleter.isCompleted) {
          historyCompleter.complete();
        }
        showErrorSnackbar(
          "${Strings.apiErrorGeneric.tr} ${Strings.checkConnectionOrRetry.tr}",
        );
      },
      onDone: () {
        _cancelSseIdleTimer();
        // If the run completed/stopped or the server closes the stream,
        // we consider streaming finished.
        assistantsReplying.value = 0;
        isStreaming.value = false;
        resetLoadingMessage();
        update();
        if (!historyCompleter.isCompleted) {
          historyCompleter.complete();
        }
      },
      cancelOnError: false,
    );

    await historyCompleter.future;

    if (conversation.value != null && messages.length > 1) {
      _isFirstMessage = false;
    }

    isLoadingConversation.value = false;
    isConversationHistoryLoaded = true;
    isConversationLastPage = true;
    update();
    scrollToBottomChat();
    PupauEventService.instance.emitPupauEvent(
      PupauEvent(
        type: UpdateConversationType.conversationChanged,
        payload: {
          "assistantId": assistantId,
          "assistantType": assistant.value?.type ?? AssistantType.assistant,
          "conversation": conversation.value!,
        },
      ),
    );
  } catch (e) {
    String errorMessage = 'Failed to load conversation: ${e.toString()}';
    PupauEventService.instance.emitPupauEvent(
      PupauEvent(
        type: UpdateConversationType.error,
        payload: {
          "error": errorMessage,
          "assistantId": assistantId,
          "assistantType": assistant.value?.type ?? AssistantType.assistant,
          "conversationId": conversationId,
        },
      ),
    );
    isLoadingConversation.value = false;
    update();
  }
}