receiveMessage method
Implementation
MessageWithRecipient? receiveMessage(int sessionID, Message message) {
if (message is Subscribe) {
if (!_subscriptionsBySession.containsKey(sessionID)) {
throw Exception("cannot subscribe, session $sessionID doesn't exist");
}
var subscription = _subscriptionsByTopic[message.topic];
if (subscription == null) {
subscription = Subscription(_idGen.next(), message.topic, {sessionID: sessionID});
_subscriptionsByTopic[message.topic] = subscription;
} else {
subscription.subscribers[sessionID] = sessionID;
}
_subscriptionsBySession.putIfAbsent(sessionID, () => {})[subscription.id] = subscription;
Subscribed subscribed = Subscribed(message.requestID, subscription.id);
return MessageWithRecipient(subscribed, sessionID);
} else if (message is UnSubscribe) {
if (!_subscriptionsBySession.containsKey(sessionID)) {
throw Exception("cannot unsubscribe, session $sessionID doesn't exist");
}
var subscriptions = _subscriptionsBySession[sessionID];
if (subscriptions == null) {
throw Exception("cannot unsubscribe, no subscription found");
}
var subscription = subscriptions[message.subscriptionID];
if (subscription == null) {
throw Exception("cannot unsubscribe, subscription $message.subscriptionID doesn't exist");
}
subscription.subscribers.remove(sessionID);
if (subscription.subscribers.isEmpty) {
_subscriptionsByTopic.remove(subscription.topic);
}
_subscriptionsBySession[sessionID]?.remove(message.subscriptionID);
UnSubscribed unSubscribed = UnSubscribed(message.requestID);
return MessageWithRecipient(unSubscribed, sessionID);
} else {
throw Exception("message type not supported");
}
}