subscribe<N extends Object> method
Implementation
Future<PipeSubscription<N>> subscribe<N extends Object>(
Topic<N> topic, {
void Function()? onReconnect,
}) async {
var thisTopicSubscription =
_registeredTopicSubscriptions.firstWhereOrNull((e) => e.topic == topic);
// Defensive check: if subscription is closed, remove it and treat as non-existent
if (thisTopicSubscription != null && thisTopicSubscription.isClosed) {
_registeredTopicSubscriptions.remove(thisTopicSubscription);
thisTopicSubscription = null;
}
if (thisTopicSubscription != null) {
final state = thisTopicSubscription.stateSubject.value;
switch (state) {
case TopicSubscriptionSubscribed():
return _registerSubscription(
subscribedState: state,
onReconnect: onReconnect,
topic: topic,
);
case TopicSubscriptionUnsubscribing():
// Mark topic subscription to not be fully removed after unsubscribe
state.newSubscriptionRequestsCount++;
final index = state.newSubscriptionRequestsCount;
// Subscription process will continue after unsubscribing future
await state.completer.future;
if (index == 1) {
// Prepare for subscribing
final subscribingState = TopicSubscriptionSubscribing();
thisTopicSubscription.stateSubject.add(subscribingState);
// Subscribe on backend and return a valid pipe subscription
// (or throw an exception)
return _sendSubscribeMethodAndVerifyResult(
subscribingState: subscribingState,
subscription: thisTopicSubscription,
onReconnect: onReconnect,
subscriptionId: thisTopicSubscription.subscriptionId,
topic: topic,
);
} else {
// Recurrence should now find the topic subscription in different state
// than unsubscribing (either subscribing or subscribed)
await Future<void>.delayed(const Duration(milliseconds: 100));
return subscribe(topic, onReconnect: onReconnect);
}
case TopicSubscriptionSubscribing():
case TopicSubscriptionReconnecting():
try {
final state = await thisTopicSubscription.stateSubject
.doOnError((err, st) {
_logger.info(
'Caught error in second subscription call, while already subscribing/reconnecting.'
'Will attempt to connect subscribe either way.',
err,
st,
);
throw const PipeConnectionException();
})
.whereType<TopicSubscriptionSubscribed>()
.first;
return _registerSubscription(
subscribedState: state,
onReconnect: onReconnect,
topic: topic,
);
} catch (_) {
// Continue with subscription process logic below
}
}
}
// Register subscription in SignalR
final subscriptionId = _uuid.v4();
// Register subscription in pipe client
final subscribingState = TopicSubscriptionSubscribing();
final subscription =
TopicSubscription(topic, subscriptionId, subscribingState);
_registeredTopicSubscriptions.add(subscription);
return _sendSubscribeMethodAndVerifyResult(
subscribingState: subscribingState,
subscription: subscription,
onReconnect: onReconnect,
subscriptionId: subscriptionId,
topic: topic,
);
}