connect method

Future<void> connect(
  1. {required PowerSyncBackendConnector connector}
)

Connect to the PowerSync service, and keep the databases in sync.

The connection is automatically re-opened if it fails for any reason.

Status changes are reported on statusStream.

Implementation

Future<void> connect({required PowerSyncBackendConnector connector}) async {
  await initialize();

  // Disconnect if connected
  await disconnect();
  final disconnector = AbortController();
  _disconnecter = disconnector;

  await _initialized;
  final dbref = database.isolateConnectionFactory();
  ReceivePort rPort = ReceivePort();
  StreamSubscription? updateSubscription;
  rPort.listen((data) async {
    if (data is List) {
      String action = data[0];
      if (action == "getCredentials") {
        await (data[1] as PortCompleter).handle(() async {
          final token = await connector.getCredentialsCached();
          logger.fine('Credentials: $token');
          return token;
        });
      } else if (action == "invalidateCredentials") {
        logger.fine('Refreshing credentials');
        await (data[1] as PortCompleter).handle(() async {
          await connector.prefetchCredentials();
        });
      } else if (action == 'init') {
        SendPort port = data[1];
        var throttled = UpdateNotification.throttleStream(
            updates, const Duration(milliseconds: 10));
        updateSubscription = throttled.listen((event) {
          port.send(['update']);
        });
        disconnector.onAbort.then((_) {
          port.send(['close']);
        }).ignore();
      } else if (action == 'uploadCrud') {
        await (data[1] as PortCompleter).handle(() async {
          await connector.uploadData(this);
        });
      } else if (action == 'status') {
        final SyncStatus status = data[1];
        _setStatus(status);
      } else if (action == 'close') {
        // Clear status apart from lastSyncedAt
        _setStatus(SyncStatus(lastSyncedAt: currentStatus.lastSyncedAt));
        rPort.close();
        updateSubscription?.cancel();
      } else if (action == 'log') {
        LogRecord record = data[1];
        logger.log(
            record.level, record.message, record.error, record.stackTrace);
      }
    }
  });

  var errorPort = ReceivePort();
  errorPort.listen((message) async {
    // Sample error:
    // flutter: [PowerSync] WARNING: 2023-06-28 16:34:11.566122: Sync Isolate error
    // flutter: [Connection closed while receiving data, #0      IOClient.send.<anonymous closure> (package:http/src/io_client.dart:76:13)
    // #1      Stream.handleError.<anonymous closure> (dart:async/stream.dart:929:16)
    // #2      _HandleErrorStream._handleError (dart:async/stream_pipe.dart:269:17)
    // #3      _ForwardingStreamSubscription._handleError (dart:async/stream_pipe.dart:157:13)
    // #4      _HttpClientResponse.listen.<anonymous closure> (dart:_http/http_impl.dart:707:16)
    // ...
    logger.severe('Sync Isolate error', message);

    // Reconnect
    connect(connector: connector);
  });

  disconnected() {
    _disconnecter?.completeAbort();
    _disconnecter = null;
    rPort.close();
    // Clear status apart from lastSyncedAt
    _setStatus(SyncStatus(lastSyncedAt: currentStatus.lastSyncedAt));
  }

  var exitPort = ReceivePort();
  exitPort.listen((message) {
    logger.fine('Sync Isolate exit');
    disconnected();
  });

  if (_disconnecter?.aborted == true) {
    disconnected();
    return;
  }

  Isolate.spawn(_powerSyncDatabaseIsolate,
      _PowerSyncDatabaseIsolateArgs(rPort.sendPort, dbref, retryDelay),
      debugName: 'PowerSyncDatabase',
      onError: errorPort.sendPort,
      onExit: exitPort.sendPort);
}