getCrudTransactions method

Stream<CrudTransaction> getCrudTransactions()
inherited

Returns a stream of completed transactions with local writes against the database.

This is typically used from the PowerSyncBackendConnector.uploadData method. Each entry emitted by the stream is a full transaction containing all local writes made while that transaction was active.

Unlike getNextCrudTransaction, which always returns the oldest transaction that hasn't been CrudTransaction.completed yet, this stream can be used to receive multiple transactions. Calling CrudTransaction.complete will mark that transaction and all prior transactions emitted by the stream as completed.

This can be used to upload multiple transactions in a single batch, e.g. with:

CrudTransaction? lastTransaction;
final batch = <CrudEntry>[];

await for (final transaction in powersync.nextCrudTransactions()) {
  batch.addAll(transaction.crud);
  lastTransaction = transaction;

  if (batch.length > 100) {
    break;
  }
}

if (batch.isNotEmpty) {
  await uploadBatch(batch);
  lastTransaction!.complete();
}

If there is no local data to upload, the stream emits a single onDone event.

Implementation

Stream<CrudTransaction> getCrudTransactions() async* {
  var lastCrudItemId = -1;
  const sql = '''
WITH RECURSIVE crud_entries AS (
SELECT id, tx_id, data FROM ps_crud WHERE id = (SELECT min(id) FROM ps_crud WHERE id > ?)
UNION ALL
SELECT ps_crud.id, ps_crud.tx_id, ps_crud.data FROM ps_crud
  INNER JOIN crud_entries ON crud_entries.id + 1 = rowid
WHERE crud_entries.tx_id = ps_crud.tx_id
)
SELECT * FROM crud_entries;
''';

  while (true) {
    final nextTransaction = await getAll(sql, [lastCrudItemId]);
    if (nextTransaction.isEmpty) {
      break;
    }

    final items = [for (var row in nextTransaction) CrudEntry.fromRow(row)];
    final last = items.last;
    final txId = last.transactionId;

    yield CrudTransaction(
      crud: items,
      complete: _crudCompletionCallback(last.clientId),
      transactionId: txId,
    );
    lastCrudItemId = last.clientId;
  }
}