subscribeTo method

RealtimeSubscription subscribeTo(
  1. List<Object> channels, [
  2. List<String> queries = const []
])
inherited

Implementation

RealtimeSubscription subscribeTo(List<Object> channels,
    [List<String> queries = const []]) {
  StreamController<RealtimeMessage> controller = StreamController.broadcast();
  final channelStrings =
      channels.map((ch) => _channelToString(ch)).toList().cast<String>();
  final queryStrings = List<String>.from(queries);

  // Allocate a new slot index
  _subscriptionsCounter++;
  final slot = _subscriptionsCounter;

  // Store slot-centric data: channels, queries, and callback belong to the slot
  // queries is stored as List<String> (array of query strings)
  // No channel mutation occurs here - channels are derived from slots in _createSocket()
  RealtimeSubscription subscription = RealtimeSubscription(
      controller: controller,
      channels: channelStrings,
      queries: queryStrings,
      close: () async {
        final subscriptionId = _slotToSubscriptionId[slot];
        _subscriptions.remove(slot);
        _slotToSubscriptionId.remove(slot);
        if (subscriptionId != null) {
          _subscriptionIdToSlot.remove(subscriptionId);
        }
        controller.close();

        // Rebuild channels from remaining slots
        final remainingChannels = <String>{};
        for (var sub in _subscriptions.values) {
          remainingChannels.addAll(sub.channels);
        }

        if (remainingChannels.isNotEmpty) {
          await Future.delayed(Duration.zero, () => _createSocket());
        } else {
          await _closeConnection();
        }
      });
  _subscriptions[slot] = subscription;

  Future.delayed(Duration.zero, () => _createSocket());
  return subscription;
}