subscribe method

RealtimeChannel subscribe([
  1. void callback(
    1. RealtimeSubscribeStatus status,
    2. Object? error
    )?,
  2. Duration? timeout
])

Subscribes to receive real-time changes

Pass a callback to react to different status changes.

timeout parameter can be used to override the default timeout set on RealtimeClient.

Implementation

RealtimeChannel subscribe([
  void Function(RealtimeSubscribeStatus status, Object? error)? callback,
  Duration? timeout,
]) {
  if (!socket.isConnected) {
    socket.connect();
  }
  if (joinedOnce == true) {
    throw "tried to subscribe multiple times. 'subscribe' can only be called a single time per channel instance";
  } else {
    final broadcast = params['config']['broadcast'];
    final presence = params['config']['presence'];

    _onError((e) {
      if (callback != null) callback(RealtimeSubscribeStatus.channelError, e);
    });
    _onClose(() {
      if (callback != null) callback(RealtimeSubscribeStatus.closed, null);
    });

    final accessTokenPayload = <String, String>{};
    final config = <String, dynamic>{
      'broadcast': broadcast,
      'presence': presence,
      'postgres_changes':
          _bindings['postgres_changes']?.map((r) => r.filter).toList() ?? [],
    };

    if (socket.accessToken != null) {
      accessTokenPayload['access_token'] = socket.accessToken!;
    }

    updateJoinPayload({'config': config, ...accessTokenPayload});

    joinedOnce = true;
    rejoin(timeout ?? _timeout);

    joinPush.receive(
      'ok',
      (response) {
        final serverPostgresFilters = response['postgres_changes'];
        if (socket.accessToken != null) socket.setAuth(socket.accessToken);

        if (serverPostgresFilters == null) {
          if (callback != null) {
            callback(RealtimeSubscribeStatus.subscribed, null);
          }
          return;
        } else {
          final clientPostgresBindings = _bindings['postgres_changes'];
          final bindingsLen = clientPostgresBindings?.length ?? 0;
          final newPostgresBindings = <Binding>[];

          for (var i = 0; i < bindingsLen; i++) {
            final clientPostgresBinding = clientPostgresBindings![i];

            final event = clientPostgresBinding.filter['event'];
            final schema = clientPostgresBinding.filter['schema'];
            final table = clientPostgresBinding.filter['table'];
            final filter = clientPostgresBinding.filter['filter'];
            final serverPostgresFilter = serverPostgresFilters[i];

            if (serverPostgresFilter != null &&
                serverPostgresFilter['event'] == event &&
                serverPostgresFilter['schema'] == schema &&
                serverPostgresFilter['table'] == table &&
                serverPostgresFilter['filter'] == filter) {
              newPostgresBindings.add(clientPostgresBinding.copyWith(
                id: serverPostgresFilter['id']?.toString(),
              ));
            } else {
              unsubscribe();
              if (callback != null) {
                callback(
                  RealtimeSubscribeStatus.channelError,
                  Exception(
                      'mismatch between server and client bindings for postgres changes'),
                );
              }
              return;
            }
          }

          _bindings['postgres_changes'] = newPostgresBindings;

          if (callback != null) {
            callback(RealtimeSubscribeStatus.subscribed, null);
          }
          return;
        }
      },
    ).receive('error', (error) {
      if (callback != null) {
        callback(
          RealtimeSubscribeStatus.channelError,
          Exception(
            jsonEncode((error as Map<String, dynamic>).isNotEmpty
                ? (error).values.join(', ')
                : 'error'),
          ),
        );
      }
      return;
    }).receive('timeout', (_) {
      if (callback != null) callback(RealtimeSubscribeStatus.timedOut, null);
      return;
    });
  }
  return this;
}