registerPubSub static method
void
registerPubSub(
- PubSub pubSub, {
- ChaCha20Key? key,
- int? syncSince,
})
Implementation
static void registerPubSub(PubSub pubSub,
{ChaCha20Key? key, int? syncSince}) {
if (!supervisedTopicByTopic.containsKey(pubSub.getLocalizedTopic())) {
supervisedTopicByTopic[pubSub.getLocalizedTopic()] =
SupervisedTopic(channel: null, pubSub: pubSub, eventSink: eventSink);
}
pubSub.timeline.desynchronize();
if (LocalitySocialCloud.isTestMode) {
pubSub.timeline.synchronize();
} else {
supervisor.listen(pubSub.getLocalizedTopic(), (phoenixChannel) {
// Save the channel and key for future use.
supervisedTopicByTopic[pubSub.getLocalizedTopic()] = SupervisedTopic(
channel: StandardSendMessage(phoenixChannel),
pubSub: pubSub,
eventSink: eventSink,
key: key);
// Listen for messages on the channel.
phoenixChannel.messages.listen((event) async {
if (event.isReply == false) {
// Handle incoming events.
processEvent(pubSub, event.payload!, key);
} else {
if (event.payload!.containsKey('response')) {
dynamic response = event.payload!['response'];
if (response.runtimeType == List) {
List<dynamic> result = response;
for (int i = 0; i < result.length; i++) {
// Process multiple events in the response.
processEvent(pubSub, result[i], key);
}
pubSub.timeline.synchronize();
}
}
}
});
// Send any waiting messages in the queue.
MessageQueueSupervisor.getQueueForChannelId(pubSub.getLocalizedTopic())
.waitingMessages
.forEach((element) {
sendMessage(pubSub, element.event, element.payload);
});
// Fetch previously cached messages for the channel, if cache is enabled.
if (cache != null) {
cache!
.getAllMessagesByChannel(pubSub.getLocalizedTopic())
.then((List<LocalityEvent> cachedMessages) {
// Execute Event Sink
for (var cachedMessage in cachedMessages) {
eventSink(pubSub, cachedMessage);
}
// Fetch new messages since the last cached message.
cache!
.getChannelFetchTime(pubSub.getLocalizedTopic())
.then((int channelTime) {
PubSubSupervisor.fetchSince(pubSub, channelTime);
});
});
} else {
// Fetch all messages if no cache is available.
PubSubSupervisor.fetchSince(pubSub, syncSince ?? 0);
}
});
}
}