connectWithBackoff method

  1. @override
Future<void> connectWithBackoff()
override

Implementation

@override
Future<void> connectWithBackoff() async {
  if (client.isConnected()) {
    // we're already connected
    return;
  }

  if (initializing != null && !initializing!.finished) {
    // we're already trying to connect to Electric
    // return the promise that resolves when the connection is established
    return initializing!.waitOn();
  }

  final _initializing = initializing;
  if (_initializing == null || _initializing.finished) {
    initializing = Waiter();
  }

  final fut = initializing!.waitOn();

  // This is so that the future is not treated as unhandled
  fut.ignore();

  Future<void> _attemptBody() async {
    if (initializing?.finished == true) {
      return fut;
    }
    await _connect();
    await _startReplication();
    await _makePendingSubscriptions();
    // this._subscribePreviousShapeRequests()

    _notifyConnectivityState(ConnectivityStatus.connected, null);
    initializing?.complete();
  }

  final backoffOpts = opts.connectionBackoffOptions;
  int retryAttempt = 0;
  try {
    await retry_lib.retry(
      () {
        retryAttempt++;
        return _attemptBody();
      },
      maxAttempts: backoffOpts.numOfAttempts,
      maxDelay: backoffOpts.maxDelay,
      delayFactor: backoffOpts.startingDelay,
      randomizationFactor: backoffOpts.randomizationFactor,
      retryIf: (e) {
        return connectRetryHandler(e, retryAttempt);
      },
    );
  } catch (e, st) {
    // We're very sure that no calls are going to modify `this.initializing` before this promise resolves
    // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
    final Object error;
    final StackTrace stackTrace;

    if (!connectRetryHandler(e, 0)) {
      error = e;
      stackTrace = st;
    } else {
      error = SatelliteException(
        SatelliteErrorCode.connectionFailedAfterRetry,
        'Failed to connect to server after exhausting retry policy. Last error thrown by server: $e\n$st',
      );
      stackTrace = StackTrace.current;
    }

    disconnect(error is SatelliteException ? error : null);
    initializing?.completeError(error, stackTrace);
  }

  return fut;
}