subscribeTo method

RealtimeSubscription subscribeTo(
  1. List<Object> channels, [
  2. List<String> queries = const []
])
inherited

Implementation

RealtimeSubscription subscribeTo(List<Object> channels,
    [List<String> queries = const []]) {
  StreamController<RealtimeMessage> controller = StreamController.broadcast();
  final channelStrings =
      channels.map((ch) => _channelToString(ch)).toList().cast<String>();
  final queryStrings = List<String>.from(queries);

  final subscriptionId = _generateUniqueSubscriptionId();

  late RealtimeSubscription subscription;

  Future<void> unsubscribe() async {
    if (!_subscriptions.containsKey(subscriptionId)) {
      return;
    }
    _subscriptions.remove(subscriptionId);
    _pendingSubscribes.remove(subscriptionId);
    await controller.close();
    _sendUnsubscribeMessage([subscriptionId]);
  }

  Future<void> update({List<Object>? channels, List<String>? queries}) async {
    final current = _subscriptions[subscriptionId];
    if (current == null) {
      return;
    }
    if (channels != null) {
      final nextChannels =
          channels.map((ch) => _channelToString(ch)).toList().cast<String>();
      current.channels
        ..clear()
        ..addAll(nextChannels);
    }
    if (queries != null) {
      current.queries
        ..clear()
        ..addAll(queries);
    }
    _enqueuePendingSubscribe(subscriptionId);
    if (_websok != null && _websok?.closeCode == null) {
      await Future.delayed(Duration.zero, _sendPendingSubscribes);
    } else {
      await Future.delayed(Duration.zero, () => _createSocket());
    }
  }

  Future<void> close() async {
    await unsubscribe();
    if (_subscriptions.isEmpty) {
      await _closeConnection();
    }
  }

  subscription = RealtimeSubscription(
    controller: controller,
    channels: channelStrings,
    queries: queryStrings,
    unsubscribe: unsubscribe,
    update: update,
    close: close,
  );
  _subscriptions[subscriptionId] = subscription;
  _enqueuePendingSubscribe(subscriptionId);

  Future.delayed(Duration.zero, () => _createSocket());
  return subscription;
}