receiveMessage method

MessageWithRecipient? receiveMessage(
  1. int sessionID,
  2. Message message
)

Implementation

MessageWithRecipient? receiveMessage(int sessionID, Message message) {
  if (message is Subscribe) {
    if (!_subscriptionsBySession.containsKey(sessionID)) {
      throw Exception("cannot subscribe, session $sessionID doesn't exist");
    }

    var subscription = _subscriptionsByTopic[message.topic];
    if (subscription == null) {
      subscription = Subscription(_idGen.next(), message.topic, {sessionID: sessionID});
      _subscriptionsByTopic[message.topic] = subscription;
    } else {
      subscription.subscribers[sessionID] = sessionID;
    }

    _subscriptionsBySession.putIfAbsent(sessionID, () => {})[subscription.id] = subscription;

    Subscribed subscribed = Subscribed(message.requestID, subscription.id);
    return MessageWithRecipient(subscribed, sessionID);
  } else if (message is UnSubscribe) {
    if (!_subscriptionsBySession.containsKey(sessionID)) {
      throw Exception("cannot unsubscribe, session $sessionID doesn't exist");
    }

    var subscriptions = _subscriptionsBySession[sessionID];
    if (subscriptions == null) {
      throw Exception("cannot unsubscribe, no subscription found");
    }

    var subscription = subscriptions[message.subscriptionID];
    if (subscription == null) {
      throw Exception("cannot unsubscribe, subscription $message.subscriptionID doesn't exist");
    }
    subscription.subscribers.remove(sessionID);
    if (subscription.subscribers.isEmpty) {
      _subscriptionsByTopic.remove(subscription.topic);
    }

    _subscriptionsBySession[sessionID]?.remove(message.subscriptionID);

    UnSubscribed unSubscribed = UnSubscribed(message.requestID);
    return MessageWithRecipient(unSubscribed, sessionID);
  } else {
    throw Exception("message type not supported");
  }
}