loadConversation method
Implementation
Future<void> loadConversation(String conversationId) async {
try {
resetConversation();
assistantsReplying.value = 0;
isStreaming.value = false;
incomingMessages.clear();
conversationSseSubscription?.cancel();
conversationSseSubscription = null;
messageNotifier = MessageNotifier();
isLoadingConversation.value = true;
loadingMessage.value = LoadingMessage(
message: "",
loadingType: LoadingType.dots,
);
update();
conversation.value = await ConversationService.getConversation(
assistantId,
conversationId,
isMarketplace,
);
if (conversation.value == null) {
isLoadingConversation.value = false;
isConversationHistoryLoaded = false;
update();
return;
}
Get.find<PupauAttachmentsController>().loadAttachments();
// Initialize notifier for possible catch-up/live updates.
messageNotifier.setAssistantId(assistantId);
messageNotifier.setConversationId(conversation.value!.id);
// Try async SSE history + catch-up (feature gated on server).
final String? lastEventId = PupauSharedPreferences.getLastEventId(
conversation.value?.id ?? "",
);
final Stream<SSEModel>? sseStream =
await SSEService.createConversationSseGetStream(
assistantId,
conversation.value!.id,
conversation.value!.token,
lastEventId: lastEventId,
chatController: this,
);
if (sseStream == null) {
// Fallback to previous REST pagination behavior.
// Initialize pagination so that the newest messages are loaded first.
_initializeConversationPagination();
await loadConversationMessages(reset: true);
if (conversation.value != null && messages.length > 1) {
_isFirstMessage = false;
}
isLoadingConversation.value = false;
isConversationHistoryLoaded = true;
update();
scrollToBottomChat();
PupauEventService.instance.emitPupauEvent(
PupauEvent(
type: UpdateConversationType.conversationChanged,
payload: {
"assistantId": assistantId,
"assistantType": assistant.value?.type ?? AssistantType.assistant,
"conversation": conversation.value!,
},
),
);
return;
}
// SSE flow: first event is always `history`, then catch-up/live events.
isConversationHistoryLoaded = false;
isConversationLastPage = true;
conversationPage = 0;
conversationItemsLoaded = 0;
resetLoadingMessage();
update();
bool historyLoaded = false;
final Completer<void> historyCompleter = Completer<void>();
conversationSseSubscription = sseStream.listen(
(SSEModel sseEvent) async {
_bumpSseIdleTimer();
final String? data = sseEvent.data;
setLastEventId(sseEvent);
if (sseEvent.event == 'history') {
if (historyLoaded) return;
historyLoaded = true;
messages.clear();
incomingMessages.clear();
try {
final dynamic decoded = jsonDecode(data ?? '[]');
final List<dynamic> items = decoded is List
? decoded
: <dynamic>[];
for (final dynamic raw in items) {
if (raw is! Map<String, dynamic>) continue;
final Map<String, dynamic> item = Map<String, dynamic>.from(
raw,
);
// Use the exact same message shaping as REST pagination,
// so markdown/thinking/tool-use elements render identically.
final PupauMessage loadedMessage = PupauMessage.fromLoadedChat(
item,
);
final PupauMessage userMessage =
MessageService.getUserLoadedMessage(loadedMessage);
final PupauMessage assistantMessage =
MessageService.getAssistantLoadedMessage(loadedMessage);
if (isFirstMessageInGroup(loadedMessage.groupId)) {
messages.insert(0, userMessage);
}
messages.insert(0, assistantMessage);
incomingMessages.add(assistantMessage);
}
} catch (_) {
// If parsing fails, keep whatever was loaded by the resetConversation.
}
messages.refresh();
update();
isConversationHistoryLoaded = true;
isConversationLastPage = true;
// Only enter streaming mode immediately when history is not empty.
// If history is empty and no further events arrive, keep non-streaming.
if (messages.isNotEmpty) {
assistantsReplying.value = 1;
isStreaming.value = true;
_bumpSseIdleTimer();
} else {
assistantsReplying.value = 0;
isStreaming.value = false;
}
update();
if (!historyCompleter.isCompleted) {
historyCompleter.complete();
}
return;
}
if (!historyLoaded) return;
if (data == null || data.trim().isEmpty) return;
try {
final Map<String, dynamic> decoded =
jsonDecode(data) as Map<String, dynamic>;
// Async SSE reconnection payloads are wrapped:
// { eventType: "...", payload: {...}, ... }
final String eventType = (decoded['eventType']?.toString() ?? '')
.trim();
// Terminal async events: stop streaming UI.
const List<String> terminalTypes = <String>[
'run_completed',
'run_stopped',
'run_error',
];
if (terminalTypes.contains(eventType)) {
assistantsReplying.value = 0;
isStreaming.value = false;
resetLoadingMessage();
update();
return;
}
// Any non-terminal event after history means the run is active.
// Mirror the regular sendMessage() behavior (assistantsReplying=1).
if (!isStreaming.value) {
assistantsReplying.value = 1;
isStreaming.value = true;
update();
}
if (eventType == 'message') {
final Map<String, dynamic> payload = decoded['payload'];
manageSSEData(payload, false);
return;
}
} catch (_) {}
},
onError: (e) {
_cancelSseIdleTimer();
if (!historyCompleter.isCompleted) {
historyCompleter.complete();
}
showErrorSnackbar(
"${Strings.apiErrorGeneric.tr} ${Strings.checkConnectionOrRetry.tr}",
);
},
onDone: () {
_cancelSseIdleTimer();
// If the run completed/stopped or the server closes the stream,
// we consider streaming finished.
assistantsReplying.value = 0;
isStreaming.value = false;
resetLoadingMessage();
update();
if (!historyCompleter.isCompleted) {
historyCompleter.complete();
}
},
cancelOnError: false,
);
await historyCompleter.future;
if (conversation.value != null && messages.length > 1) {
_isFirstMessage = false;
}
isLoadingConversation.value = false;
isConversationHistoryLoaded = true;
isConversationLastPage = true;
update();
scrollToBottomChat();
PupauEventService.instance.emitPupauEvent(
PupauEvent(
type: UpdateConversationType.conversationChanged,
payload: {
"assistantId": assistantId,
"assistantType": assistant.value?.type ?? AssistantType.assistant,
"conversation": conversation.value!,
},
),
);
} catch (e) {
String errorMessage = 'Failed to load conversation: ${e.toString()}';
PupauEventService.instance.emitPupauEvent(
PupauEvent(
type: UpdateConversationType.error,
payload: {
"error": errorMessage,
"assistantId": assistantId,
"assistantType": assistant.value?.type ?? AssistantType.assistant,
"conversationId": conversationId,
},
),
);
isLoadingConversation.value = false;
update();
}
}