subscribe method
Register the subscription listener.
You can subscribe multiple times to the same topic.
If the SSE connection is not started yet, this method will also initialize it.
Here is an example listening to the connect/reconnect events:
pb.realtime.subscribe("PB_CONNECT", (e) {
print("Connected: $e");
});
Implementation
Future<UnsubscribeFunc> subscribe(
String topic,
SubscriptionFunc listener, {
String? expand,
String? filter,
String? fields,
Map<String, dynamic> query = const {},
Map<String, String> headers = const {},
}) async {
var key = topic;
// merge query parameters
final enrichedQuery = Map<String, dynamic>.of(query);
if (expand?.isNotEmpty ?? false) {
enrichedQuery["expand"] ??= expand;
}
if (filter?.isNotEmpty ?? false) {
enrichedQuery["filter"] ??= filter;
}
if (fields?.isNotEmpty ?? false) {
enrichedQuery["fields"] ??= fields;
}
// serialize and append the topic options (if any)
final options = <String, dynamic>{};
if (enrichedQuery.isNotEmpty) {
options["query"] = enrichedQuery;
}
if (headers.isNotEmpty) {
options["headers"] = headers;
}
if (options.isNotEmpty) {
final encoded =
"options=${Uri.encodeQueryComponent(jsonEncode(options))}";
key += (key.contains("?") ? "&" : "?") + encoded;
}
if (!_subscriptions.containsKey(key)) {
_subscriptions[key] = [];
}
_subscriptions[key]?.add(listener);
// start a new sse connection
if (_sse == null) {
await _connect();
} else if (_clientId.isNotEmpty && _subscriptions[key]?.length == 1) {
// otherwise - just persist the updated subscriptions
// (if it is the first for the topic)
await _submitSubscriptions();
}
return () async {
return unsubscribeByTopicAndListener(topic, listener);
};
}