subscribe method
RealtimeChannel
subscribe([
- void callback(
- RealtimeSubscribeStatus status,
- Object? error
- Duration? timeout
Subscribes to receive real-time changes
Pass a callback
to react to different status changes.
timeout
parameter can be used to override the default timeout set on RealtimeClient.
Implementation
RealtimeChannel subscribe([
void Function(RealtimeSubscribeStatus status, Object? error)? callback,
Duration? timeout,
]) {
if (!socket.isConnected) {
socket.connect();
}
if (joinedOnce == true) {
throw "tried to subscribe multiple times. 'subscribe' can only be called a single time per channel instance";
} else {
final broadcast = params['config']['broadcast'];
final presence = params['config']['presence'];
_onError((e) {
if (callback != null) callback(RealtimeSubscribeStatus.channelError, e);
});
_onClose(() {
if (callback != null) callback(RealtimeSubscribeStatus.closed, null);
});
final accessTokenPayload = <String, String>{};
final config = <String, dynamic>{
'broadcast': broadcast,
'presence': presence,
'postgres_changes':
_bindings['postgres_changes']?.map((r) => r.filter).toList() ?? [],
};
if (socket.accessToken != null) {
accessTokenPayload['access_token'] = socket.accessToken!;
}
updateJoinPayload({'config': config, ...accessTokenPayload});
joinedOnce = true;
rejoin(timeout ?? _timeout);
joinPush.receive(
'ok',
(response) {
final serverPostgresFilters = response['postgres_changes'];
if (socket.accessToken != null) socket.setAuth(socket.accessToken);
if (serverPostgresFilters == null) {
if (callback != null) {
callback(RealtimeSubscribeStatus.subscribed, null);
}
return;
} else {
final clientPostgresBindings = _bindings['postgres_changes'];
final bindingsLen = clientPostgresBindings?.length ?? 0;
final newPostgresBindings = <Binding>[];
for (var i = 0; i < bindingsLen; i++) {
final clientPostgresBinding = clientPostgresBindings![i];
final event = clientPostgresBinding.filter['event'];
final schema = clientPostgresBinding.filter['schema'];
final table = clientPostgresBinding.filter['table'];
final filter = clientPostgresBinding.filter['filter'];
final serverPostgresFilter = serverPostgresFilters[i];
if (serverPostgresFilter != null &&
serverPostgresFilter['event'] == event &&
serverPostgresFilter['schema'] == schema &&
serverPostgresFilter['table'] == table &&
serverPostgresFilter['filter'] == filter) {
newPostgresBindings.add(clientPostgresBinding.copyWith(
id: serverPostgresFilter['id']?.toString(),
));
} else {
unsubscribe();
if (callback != null) {
callback(
RealtimeSubscribeStatus.channelError,
Exception(
'mismatch between server and client bindings for postgres changes'),
);
}
return;
}
}
_bindings['postgres_changes'] = newPostgresBindings;
if (callback != null) {
callback(RealtimeSubscribeStatus.subscribed, null);
}
return;
}
},
).receive('error', (error) {
if (callback != null) {
callback(
RealtimeSubscribeStatus.channelError,
Exception(
jsonEncode((error as Map<String, dynamic>).isNotEmpty
? (error).values.join(', ')
: 'error'),
),
);
}
return;
}).receive('timeout', (_) {
if (callback != null) callback(RealtimeSubscribeStatus.timedOut, null);
return;
});
}
return this;
}