performSnapshot method

  1. @visibleForTesting
Future<DateTime> performSnapshot()

Implementation

@visibleForTesting
Future<DateTime> performSnapshot() async {
  if (_debugPerformSnapshotFun != null) {
    return _debugPerformSnapshotFun!();
  }

  // assert a single call at a time
  if (_performingSnapshot) {
    throw SatelliteException(
      SatelliteErrorCode.internal,
      'already performing snapshot',
    );
  } else {
    _performingSnapshot = true;
  }

  try {
    final oplog = '${opts.oplogTable}';
    final shadow = '${opts.shadowTable}';
    final timestamp = DateTime.now();
    final newTag = _generateTag(timestamp);

    /*
   * IMPORTANT!
   *
   * The following queries make use of a documented but rare SQLite behaviour that allows selecting bare column
   * on aggregate queries: https://sqlite.org/lang_select.html#bare_columns_in_an_aggregate_query
   *
   * In short, when a query has a `GROUP BY` clause with a single `min()` or `max()` present in SELECT/HAVING,
   * then the "bare" columns (i.e. those not mentioned in a `GROUP BY` clause) are definitely the ones from the
   * row that satisfied that `min`/`max` function. We make use of it here to find first/last operations in the
   * oplog that touch a particular row.
   */

    // Update the timestamps on all "new" entries - they have been added but timestamp is still `NULL`
    final q1 = Statement(
      '''
    UPDATE $oplog SET timestamp = ${builder.makePositionalParam(1)}
    WHERE rowid in (
      SELECT rowid FROM $oplog
          WHERE timestamp is NULL
      ORDER BY rowid ASC
      )
    RETURNING *
  ''',
      [timestamp.toISOStringUTC()],
    );

    // We're adding new tag to the shadow tags for this row
    final q2 = Statement(
      '''
    UPDATE $oplog
    SET "clearTags" =
        CASE WHEN shadow.tags = '[]' OR shadow.tags = ''
             THEN '["' || ${builder.makePositionalParam(1)} || '"]'
             ELSE '["' || ${builder.makePositionalParam(2)} || '",' || substring(shadow.tags, 2)
        END
    FROM $shadow AS shadow
    WHERE $oplog.namespace = shadow.namespace
        AND $oplog.tablename = shadow.tablename
        AND $oplog."primaryKey" = shadow."primaryKey" AND $oplog.timestamp = ${builder.makePositionalParam(3)}
  ''',
      [
        newTag,
        newTag,
        timestamp.toISOStringUTC(),
      ],
    );

    // For each affected shadow row, set new tag array, unless the last oplog operation was a DELETE
    final q3 = Statement(
      builder.setTagsForShadowRows(opts.oplogTable, opts.shadowTable),
      [
        encodeTags([newTag]),
        timestamp.toISOStringUTC(),
      ],
    );

    // And finally delete any shadow rows where the last oplog operation was a `DELETE`
    final q4 = Statement(
      builder.removeDeletedShadowRows(opts.oplogTable, opts.shadowTable),
      [timestamp.toISOStringUTC()],
    );

    // Execute the four queries above in a transaction, returning the results from the first query
    // We're dropping down to this transaction interface because `runInTransaction` doesn't allow queries
    final oplogEntries =
        await adapter.transaction<List<OplogEntry>>((tx, setResult) {
      tx.query(q1, (tx, res) {
        if (res.isNotEmpty) {
          tx.run(
            q2,
            (tx, _) => tx.run(
              q3,
              (tx, _) => tx.run(
                q4,
                (_, __) => setResult(res.map(opLogEntryFromRow).toList()),
              ),
            ),
          );
        } else {
          setResult([]);
        }
      });
    });

    if (oplogEntries.isNotEmpty) {
      _notifyChanges(oplogEntries, ChangeOrigin.local);
    }

    if (client.getOutboundReplicationStatus() == ReplicationStatus.active) {
      final enqueued = client.getLastSentLsn();
      final enqueuedLogPos = bytesToNumber(enqueued);

      // TODO: handle case where pending oplog is large
      await getEntries(since: enqueuedLogPos).then(
        (missing) => _replicateSnapshotChanges(missing),
      );
    }
    return timestamp;
  } catch (e) {
    logger.error('error performing snapshot: $e');
    rethrow;
  } finally {
    _performingSnapshot = false;
  }
}