subscribeTo method
Implementation
RealtimeSubscription subscribeTo(List<Object> channels,
[List<String> queries = const []]) {
StreamController<RealtimeMessage> controller = StreamController.broadcast();
final channelStrings =
channels.map((ch) => _channelToString(ch)).toList().cast<String>();
final queryStrings = List<String>.from(queries);
// Allocate a new slot index
_subscriptionsCounter++;
final slot = _subscriptionsCounter;
// Store slot-centric data: channels, queries, and callback belong to the slot
// queries is stored as List<String> (array of query strings)
// No channel mutation occurs here - channels are derived from slots in _createSocket()
RealtimeSubscription subscription = RealtimeSubscription(
controller: controller,
channels: channelStrings,
queries: queryStrings,
close: () async {
final subscriptionId = _slotToSubscriptionId[slot];
_subscriptions.remove(slot);
_slotToSubscriptionId.remove(slot);
if (subscriptionId != null) {
_subscriptionIdToSlot.remove(subscriptionId);
}
controller.close();
// Rebuild channels from remaining slots
final remainingChannels = <String>{};
for (var sub in _subscriptions.values) {
remainingChannels.addAll(sub.channels);
}
if (remainingChannels.isNotEmpty) {
await Future.delayed(Duration.zero, () => _createSocket());
} else {
await _closeConnection();
}
});
_subscriptions[slot] = subscription;
Future.delayed(Duration.zero, () => _createSocket());
return subscription;
}