subscribe method

Future<UnsubscribeFunc> subscribe(
  1. String topic,
  2. SubscriptionFunc listener, {
  3. String? expand,
  4. String? filter,
  5. String? fields,
  6. Map<String, dynamic> query = const {},
  7. Map<String, String> headers = const {},
})

Register the subscription listener.

You can subscribe multiple times to the same topic.

If the SSE connection is not started yet, this method will also initialize it.

Here is an example listening to the connect/reconnect events:

pb.realtime.subscribe("PB_CONNECT", (e) {
  print("Connected: $e");
});

Implementation

Future<UnsubscribeFunc> subscribe(
  String topic,
  SubscriptionFunc listener, {
  String? expand,
  String? filter,
  String? fields,
  Map<String, dynamic> query = const {},
  Map<String, String> headers = const {},
}) async {
  var key = topic;

  // merge query parameters
  final enrichedQuery = Map<String, dynamic>.of(query);
  if (expand?.isNotEmpty ?? false) {
    enrichedQuery["expand"] ??= expand;
  }
  if (filter?.isNotEmpty ?? false) {
    enrichedQuery["filter"] ??= filter;
  }
  if (fields?.isNotEmpty ?? false) {
    enrichedQuery["fields"] ??= fields;
  }

  // serialize and append the topic options (if any)
  final options = <String, dynamic>{};
  if (enrichedQuery.isNotEmpty) {
    options["query"] = enrichedQuery;
  }
  if (headers.isNotEmpty) {
    options["headers"] = headers;
  }
  if (options.isNotEmpty) {
    final encoded =
        "options=${Uri.encodeQueryComponent(jsonEncode(options))}";
    key += (key.contains("?") ? "&" : "?") + encoded;
  }

  if (!_subscriptions.containsKey(key)) {
    _subscriptions[key] = [];
  }
  _subscriptions[key]?.add(listener);

  // start a new sse connection
  if (_sse == null) {
    await _connect();
  } else if (_clientId.isNotEmpty && _subscriptions[key]?.length == 1) {
    // otherwise - just persist the updated subscriptions
    // (if it is the first for the topic)
    await _submitSubscriptions();
  }

  return () async {
    return unsubscribeByTopicAndListener(topic, listener);
  };
}