handleRealtime method
Future<void>
handleRealtime(
- HttpRequest request,
- Map<
SupabaseRequest< responsesSupabaseModel> , SupabaseResponse>
Handle realtime/stream requests
Implementation
Future<void> handleRealtime(
HttpRequest request,
Map<SupabaseRequest, SupabaseResponse> responses,
) async {
webSocket = await WebSocketTransformer.upgrade(request);
if (hasListener) {
return;
}
hasListener = true;
listener = webSocket!.listen((req) async {
/// `filter` might be there or not depending on whether is a filter set
/// to the realtime subscription, so include the filter if the request
/// includes a filter.
final requestJson = jsonDecode(req);
final topic = requestJson['topic'];
final ref = requestJson['ref'];
if (requestJson['event'] == 'phx_leave') {
listeners.remove(topic);
return;
}
if (listeners.contains(topic)) return;
listeners.add(topic);
final realtimeFilter = requestJson['payload']['config']['postgres_changes'].first['filter'];
final matching = responses.entries
.firstWhereOrNull((r) => r.key.realtime && realtimeFilter == r.key.filter);
if (matching == null) return;
if (requestJson['payload']['config']['postgres_changes'].first['event'] != '*') {
final replyString = jsonEncode({
'event': 'phx_reply',
'payload': {
'response': {
'postgres_changes': matching.value.flattenedResponses.map((r) {
final data = Map<String, dynamic>.from(r.data as Map);
return {
'id': data['payload']['ids'][0],
'event': data['payload']['data']['type'],
'schema': data['payload']['data']['schema'],
'table': data['payload']['data']['table'],
if (realtimeFilter != null) 'filter': realtimeFilter,
};
}).toList(),
},
'status': 'ok',
},
'ref': ref,
'topic': topic,
});
webSocket!.add(replyString);
}
for (final realtimeResponses in matching.value.flattenedResponses) {
await Future.delayed(matching.value.realtimeSubsequentReplyDelay);
final data = Map<String, dynamic>.from(realtimeResponses.data as Map);
final serialized = jsonEncode({...data, 'topic': topic});
webSocket!.add(serialized);
}
});
}