subscribe<N extends Object> method

Future<PipeSubscription<N>> subscribe<N extends Object>(
  1. Topic<N> topic, {
  2. void onReconnect()?,
})

Implementation

Future<PipeSubscription<N>> subscribe<N extends Object>(
  Topic<N> topic, {
  void Function()? onReconnect,
}) async {
  var thisTopicSubscription =
      _registeredTopicSubscriptions.firstWhereOrNull((e) => e.topic == topic);

  // Defensive check: if subscription is closed, remove it and treat as non-existent
  if (thisTopicSubscription != null && thisTopicSubscription.isClosed) {
    _registeredTopicSubscriptions.remove(thisTopicSubscription);
    thisTopicSubscription = null;
  }

  if (thisTopicSubscription != null) {
    final state = thisTopicSubscription.stateSubject.value;
    switch (state) {
      case TopicSubscriptionSubscribed():
        return _registerSubscription(
          subscribedState: state,
          onReconnect: onReconnect,
          topic: topic,
        );
      case TopicSubscriptionUnsubscribing():
        // Mark topic subscription to not be fully removed after unsubscribe
        state.newSubscriptionRequestsCount++;
        final index = state.newSubscriptionRequestsCount;

        // Subscription process will continue after unsubscribing future
        await state.completer.future;

        if (index == 1) {
          // Prepare for subscribing
          final subscribingState = TopicSubscriptionSubscribing();
          thisTopicSubscription.stateSubject.add(subscribingState);

          // Subscribe on backend and return a valid pipe subscription
          // (or throw an exception)
          return _sendSubscribeMethodAndVerifyResult(
            subscribingState: subscribingState,
            subscription: thisTopicSubscription,
            onReconnect: onReconnect,
            subscriptionId: thisTopicSubscription.subscriptionId,
            topic: topic,
          );
        } else {
          // Recurrence should now find the topic subscription in different state
          // than unsubscribing (either subscribing or subscribed)
          await Future<void>.delayed(const Duration(milliseconds: 100));
          return subscribe(topic, onReconnect: onReconnect);
        }

      case TopicSubscriptionSubscribing():
      case TopicSubscriptionReconnecting():
        try {
          final state = await thisTopicSubscription.stateSubject
              .doOnError((err, st) {
                _logger.info(
                  'Caught error in second subscription call, while already subscribing/reconnecting.'
                  'Will attempt to connect subscribe either way.',
                  err,
                  st,
                );
                throw const PipeConnectionException();
              })
              .whereType<TopicSubscriptionSubscribed>()
              .first;

          return _registerSubscription(
            subscribedState: state,
            onReconnect: onReconnect,
            topic: topic,
          );
        } catch (_) {
          // Continue with subscription process logic below
        }
    }
  }

  // Register subscription in SignalR
  final subscriptionId = _uuid.v4();

  // Register subscription in pipe client
  final subscribingState = TopicSubscriptionSubscribing();
  final subscription =
      TopicSubscription(topic, subscriptionId, subscribingState);

  _registeredTopicSubscriptions.add(subscription);

  return _sendSubscribeMethodAndVerifyResult(
    subscribingState: subscribingState,
    subscription: subscription,
    onReconnect: onReconnect,
    subscriptionId: subscriptionId,
    topic: topic,
  );
}