subscribe method
void
subscribe(})
Subscribes to the WebSocket message stream with custom event handlers.
subscriptionName: Unique name for this subscription (used for management)
timeout: Optional timeout in seconds (default: 30s + 20s buffer)
onMessage: Called when a message is received
onError: Called if an error occurs
onDone: Called when the stream is closed
onTimeOut: Called if no message is received within the timeout
onSessionTimeOut: Called if the WebSocket session times out (closeCode == 1005)
If the WebSocket is not initialized, this method does nothing.
Implementation
void subscribe(String subscriptionName, int? timeout,
{required Function(String? response) onMessage,
required Function(String? error) onError,
required Function() onDone,
required Function() onTimeOut,
required Function() onSessionTimeOut}) {
if (_channel == null || _streamController == null) {
debugPrint("⚠️ WebSocket not initialized. Call initWebSocket() first.");
return;
}
// If the server closed the connection with code 1005, treat as session timeout
if (_channel!.closeCode == 1005) {
debugPrint("⚠️ WebSocket session timeout.");
closeWebSocket();
onSessionTimeOut();
return;
}
// Start a timer to enforce a timeout for this subscription
Timer timeoutTimer = Timer(Duration(seconds: (timeout ?? 30) + 20), () {
WebSocketManager().unsubscribe(subscriptionName);
onTimeOut();
});
// Prevent duplicate subscriptions with the same name
if (_subscriptions.containsKey(subscriptionName)) {
debugPrint("⚠️ Already subscribed to $subscriptionName.");
return;
}
debugPrint(
"📢 Subscribing to $subscriptionName... Listeners: [33m [1m [4m [0m");
// Listen to the broadcast stream for messages
final subscription = _streamController!.stream.listen(
(message) {
timeoutTimer.cancel(); // Cancel the timer on message
debugPrint("📌 [$subscriptionName] Message Received: $message");
onMessage(message);
},
onError: (error) {
timeoutTimer.cancel();
debugPrint("❌ [$subscriptionName] Error: $error");
onError(error);
},
onDone: () {
timeoutTimer.cancel();
debugPrint("🔻 [$subscriptionName] Stream closed.");
onDone();
},
);
// Store the subscription for later management
_subscriptions[subscriptionName] = subscription;
}