handleRealtime method

Future<void> handleRealtime(
  1. HttpRequest request,
  2. Map<SupabaseRequest<SupabaseModel>, SupabaseResponse> responses
)

Handle realtime/stream requests

Implementation

Future<void> handleRealtime(
  HttpRequest request,
  Map<SupabaseRequest, SupabaseResponse> responses,
) async {
  webSocket = await WebSocketTransformer.upgrade(request);
  if (hasListener) {
    return;
  }
  hasListener = true;

  listener = webSocket!.listen((req) async {
    /// `filter` might be there or not depending on whether is a filter set
    /// to the realtime subscription, so include the filter if the request
    /// includes a filter.
    final requestJson = jsonDecode(req);
    final topic = requestJson['topic'];
    final ref = requestJson['ref'];

    if (requestJson['event'] == 'phx_leave') {
      listeners.remove(topic);
      return;
    }

    if (listeners.contains(topic)) return;
    listeners.add(topic);

    final realtimeFilter = requestJson['payload']['config']['postgres_changes'].first['filter'];

    final matching = responses.entries
        .firstWhereOrNull((r) => r.key.realtime && realtimeFilter == r.key.filter);

    if (matching == null) return;

    if (requestJson['payload']['config']['postgres_changes'].first['event'] != '*') {
      final replyString = jsonEncode({
        'event': 'phx_reply',
        'payload': {
          'response': {
            'postgres_changes': matching.value.flattenedResponses.map((r) {
              final data = Map<String, dynamic>.from(r.data as Map);

              return {
                'id': data['payload']['ids'][0],
                'event': data['payload']['data']['type'],
                'schema': data['payload']['data']['schema'],
                'table': data['payload']['data']['table'],
                if (realtimeFilter != null) 'filter': realtimeFilter,
              };
            }).toList(),
          },
          'status': 'ok',
        },
        'ref': ref,
        'topic': topic,
      });
      webSocket!.add(replyString);
    }

    for (final realtimeResponses in matching.value.flattenedResponses) {
      await Future.delayed(matching.value.realtimeSubsequentReplyDelay);
      final data = Map<String, dynamic>.from(realtimeResponses.data as Map);
      final serialized = jsonEncode({...data, 'topic': topic});
      webSocket!.add(serialized);
    }
  });
}