subscribeTo method
Implementation
RealtimeSubscription subscribeTo(List<Object> channels,
[List<String> queries = const []]) {
StreamController<RealtimeMessage> controller = StreamController.broadcast();
final channelStrings =
channels.map((ch) => _channelToString(ch)).toList().cast<String>();
final queryStrings = List<String>.from(queries);
final subscriptionId = _generateUniqueSubscriptionId();
late RealtimeSubscription subscription;
Future<void> unsubscribe() async {
if (!_subscriptions.containsKey(subscriptionId)) {
return;
}
_subscriptions.remove(subscriptionId);
_pendingSubscribes.remove(subscriptionId);
await controller.close();
_sendUnsubscribeMessage([subscriptionId]);
}
Future<void> update({List<Object>? channels, List<String>? queries}) async {
final current = _subscriptions[subscriptionId];
if (current == null) {
return;
}
if (channels != null) {
final nextChannels =
channels.map((ch) => _channelToString(ch)).toList().cast<String>();
current.channels
..clear()
..addAll(nextChannels);
}
if (queries != null) {
current.queries
..clear()
..addAll(queries);
}
_enqueuePendingSubscribe(subscriptionId);
if (_websok != null && _websok?.closeCode == null) {
await Future.delayed(Duration.zero, _sendPendingSubscribes);
} else {
await Future.delayed(Duration.zero, () => _createSocket());
}
}
Future<void> close() async {
await unsubscribe();
if (_subscriptions.isEmpty) {
await _closeConnection();
}
}
subscription = RealtimeSubscription(
controller: controller,
channels: channelStrings,
queries: queryStrings,
unsubscribe: unsubscribe,
update: update,
close: close,
);
_subscriptions[subscriptionId] = subscription;
_enqueuePendingSubscribe(subscriptionId);
Future.delayed(Duration.zero, () => _createSocket());
return subscription;
}