registerPubSub static method

void registerPubSub(
  1. PubSub pubSub, {
  2. ChaCha20Key? key,
  3. int? syncSince,
})

Implementation

static void registerPubSub(PubSub pubSub,
    {ChaCha20Key? key, int? syncSince}) {
  if (!supervisedTopicByTopic.containsKey(pubSub.getLocalizedTopic())) {
    supervisedTopicByTopic[pubSub.getLocalizedTopic()] =
        SupervisedTopic(channel: null, pubSub: pubSub, eventSink: eventSink);
  }
  pubSub.timeline.desynchronize();

  if (LocalitySocialCloud.isTestMode) {
    pubSub.timeline.synchronize();
  } else {
    supervisor.listen(pubSub.getLocalizedTopic(), (phoenixChannel) {
      // Save the channel and key for future use.
      supervisedTopicByTopic[pubSub.getLocalizedTopic()] = SupervisedTopic(
          channel: StandardSendMessage(phoenixChannel),
          pubSub: pubSub,
          eventSink: eventSink,
          key: key);

      // Listen for messages on the channel.
      phoenixChannel.messages.listen((event) async {
        if (event.isReply == false) {
          // Handle incoming events.
          processEvent(pubSub, event.payload!, key);
        } else {
          if (event.payload!.containsKey('response')) {
            dynamic response = event.payload!['response'];
            if (response.runtimeType == List) {
              List<dynamic> result = response;
              for (int i = 0; i < result.length; i++) {
                // Process multiple events in the response.
                processEvent(pubSub, result[i], key);
              }
              pubSub.timeline.synchronize();
            }
          }
        }
      });

      // Send any waiting messages in the queue.
      MessageQueueSupervisor.getQueueForChannelId(pubSub.getLocalizedTopic())
          .waitingMessages
          .forEach((element) {
        sendMessage(pubSub, element.event, element.payload);
      });

      // Fetch previously cached messages for the channel, if cache is enabled.
      if (cache != null) {
        cache!
            .getAllMessagesByChannel(pubSub.getLocalizedTopic())
            .then((List<LocalityEvent> cachedMessages) {
          // Execute Event Sink
          for (var cachedMessage in cachedMessages) {
            eventSink(pubSub, cachedMessage);
          }
          // Fetch new messages since the last cached message.
          cache!
              .getChannelFetchTime(pubSub.getLocalizedTopic())
              .then((int channelTime) {
            PubSubSupervisor.fetchSince(pubSub, channelTime);
          });
        });
      } else {
        // Fetch all messages if no cache is available.
        PubSubSupervisor.fetchSince(pubSub, syncSince ?? 0);
      }
    });
  }
}