performSnapshot method
Implementation
@visibleForTesting
Future<DateTime> performSnapshot() async {
if (_debugPerformSnapshotFun != null) {
return _debugPerformSnapshotFun!();
}
// assert a single call at a time
if (_performingSnapshot) {
throw SatelliteException(
SatelliteErrorCode.internal,
'already performing snapshot',
);
} else {
_performingSnapshot = true;
}
try {
final oplog = '${opts.oplogTable}';
final shadow = '${opts.shadowTable}';
final timestamp = DateTime.now();
final newTag = _generateTag(timestamp);
/*
* IMPORTANT!
*
* The following queries make use of a documented but rare SQLite behaviour that allows selecting bare column
* on aggregate queries: https://sqlite.org/lang_select.html#bare_columns_in_an_aggregate_query
*
* In short, when a query has a `GROUP BY` clause with a single `min()` or `max()` present in SELECT/HAVING,
* then the "bare" columns (i.e. those not mentioned in a `GROUP BY` clause) are definitely the ones from the
* row that satisfied that `min`/`max` function. We make use of it here to find first/last operations in the
* oplog that touch a particular row.
*/
// Update the timestamps on all "new" entries - they have been added but timestamp is still `NULL`
final q1 = Statement(
'''
UPDATE $oplog SET timestamp = ${builder.makePositionalParam(1)}
WHERE rowid in (
SELECT rowid FROM $oplog
WHERE timestamp is NULL
ORDER BY rowid ASC
)
RETURNING *
''',
[timestamp.toISOStringUTC()],
);
// We're adding new tag to the shadow tags for this row
final q2 = Statement(
'''
UPDATE $oplog
SET "clearTags" =
CASE WHEN shadow.tags = '[]' OR shadow.tags = ''
THEN '["' || ${builder.makePositionalParam(1)} || '"]'
ELSE '["' || ${builder.makePositionalParam(2)} || '",' || substring(shadow.tags, 2)
END
FROM $shadow AS shadow
WHERE $oplog.namespace = shadow.namespace
AND $oplog.tablename = shadow.tablename
AND $oplog."primaryKey" = shadow."primaryKey" AND $oplog.timestamp = ${builder.makePositionalParam(3)}
''',
[
newTag,
newTag,
timestamp.toISOStringUTC(),
],
);
// For each affected shadow row, set new tag array, unless the last oplog operation was a DELETE
final q3 = Statement(
builder.setTagsForShadowRows(opts.oplogTable, opts.shadowTable),
[
encodeTags([newTag]),
timestamp.toISOStringUTC(),
],
);
// And finally delete any shadow rows where the last oplog operation was a `DELETE`
final q4 = Statement(
builder.removeDeletedShadowRows(opts.oplogTable, opts.shadowTable),
[timestamp.toISOStringUTC()],
);
// Execute the four queries above in a transaction, returning the results from the first query
// We're dropping down to this transaction interface because `runInTransaction` doesn't allow queries
final oplogEntries =
await adapter.transaction<List<OplogEntry>>((tx, setResult) {
tx.query(q1, (tx, res) {
if (res.isNotEmpty) {
tx.run(
q2,
(tx, _) => tx.run(
q3,
(tx, _) => tx.run(
q4,
(_, __) => setResult(res.map(opLogEntryFromRow).toList()),
),
),
);
} else {
setResult([]);
}
});
});
if (oplogEntries.isNotEmpty) {
_notifyChanges(oplogEntries, ChangeOrigin.local);
}
if (client.getOutboundReplicationStatus() == ReplicationStatus.active) {
final enqueued = client.getLastSentLsn();
final enqueuedLogPos = bytesToNumber(enqueued);
// TODO: handle case where pending oplog is large
await getEntries(since: enqueuedLogPos).then(
(missing) => _replicateSnapshotChanges(missing),
);
}
return timestamp;
} catch (e) {
logger.error('error performing snapshot: $e');
rethrow;
} finally {
_performingSnapshot = false;
}
}