subscribe method

void subscribe(
  1. String subscriptionName,
  2. int? timeout, {
  3. required dynamic onMessage(
    1. String? response
    ),
  4. required dynamic onError(
    1. String? error
    ),
  5. required dynamic onDone(),
  6. required dynamic onTimeOut(),
  7. required dynamic onSessionTimeOut(),
})

Subscribes to the WebSocket message stream with custom event handlers.

subscriptionName: Unique name for this subscription (used for management) timeout: Optional timeout in seconds (default: 30s + 20s buffer) onMessage: Called when a message is received onError: Called if an error occurs onDone: Called when the stream is closed onTimeOut: Called if no message is received within the timeout onSessionTimeOut: Called if the WebSocket session times out (closeCode == 1005)

If the WebSocket is not initialized, this method does nothing.

Implementation

void subscribe(String subscriptionName, int? timeout,
    {required Function(String? response) onMessage,
      required Function(String? error) onError,
      required Function() onDone,
      required Function() onTimeOut,
      required Function() onSessionTimeOut}) {
  if (_channel == null || _streamController == null) {
    debugPrint("⚠️ WebSocket not initialized. Call initWebSocket() first.");
    return;
  }

  // If the server closed the connection with code 1005, treat as session timeout
  if (_channel!.closeCode == 1005) {
    debugPrint("⚠️ WebSocket session timeout.");
    closeWebSocket();
    onSessionTimeOut();
    return;
  }

  // Start a timer to enforce a timeout for this subscription
  Timer timeoutTimer = Timer(Duration(seconds: (timeout ?? 30) + 20), () {
    WebSocketManager().unsubscribe(subscriptionName);
    onTimeOut();
  });

  // Prevent duplicate subscriptions with the same name
  if (_subscriptions.containsKey(subscriptionName)) {
    debugPrint("⚠️ Already subscribed to $subscriptionName.");
    return;
  }

  debugPrint(
      "📢 Subscribing to $subscriptionName... Listeners:  [33m [1m [4m [0m");

  // Listen to the broadcast stream for messages
  final subscription = _streamController!.stream.listen(
        (message) {
      timeoutTimer.cancel(); // Cancel the timer on message
      debugPrint("📌 [$subscriptionName] Message Received: $message");
      onMessage(message);
    },
    onError: (error) {
      timeoutTimer.cancel();
      debugPrint("❌ [$subscriptionName] Error: $error");
      onError(error);
    },
    onDone: () {
      timeoutTimer.cancel();
      debugPrint("🔻 [$subscriptionName] Stream closed.");
      onDone();
    },
  );

  // Store the subscription for later management
  _subscriptions[subscriptionName] = subscription;
}