applyTransaction method
Implementation
@visibleForTesting
Future<void> applyTransaction(Transaction transaction) async {
final namespace = builder.defaultNamespace;
final origin = transaction.origin!;
final commitTimestamp = DateTime.fromMillisecondsSinceEpoch(
transaction.commitTimestamp.toInt(),
);
// Transactions coming from the replication stream
// may contain DML operations manipulating data
// but may also contain DDL operations migrating schemas.
// DML operations are ran through conflict resolution logic.
// DDL operations are applied as is against the local DB.
// `stmts` will store all SQL statements
// that need to be executed
final stmts = <Statement>[];
// `txStmts` will store the statements related to the transaction
// including the creation of triggers
// but not statements that disable/enable the triggers
// neither statements that update meta tables or modify pragmas.
// The `txStmts` is used to compute the hash of migration transactions
final txStmts = <Statement>[];
final tablenamesSet = <String>{};
var newTables = <String>{};
final opLogEntries = <OplogEntry>[];
final lsn = transaction.lsn;
bool firstDMLChunk = true;
// Defer (SQLite) or temporarily disable FK checks (Postgres)
// because order of inserts may not respect referential integrity
// and Postgres doesn't let us defer FKs
// that were not originally defined as deferrable
stmts.add(Statement(builder.deferOrDisableFKsForTx));
// update lsn.
stmts.add(updateLsnStmt(lsn));
stmts.add(_resetAllSeenStmt());
Future<void> processDML(List<DataChange> changes) async {
final tx = DataTransaction(
commitTimestamp: transaction.commitTimestamp,
lsn: transaction.lsn,
changes: changes,
);
final entries = fromTransaction(tx, relations, namespace);
// Before applying DML statements we need to assign a timestamp to pending operations.
// This only needs to be done once, even if there are several DML chunks
// because all those chunks are part of the same transaction.
if (firstDMLChunk) {
logger.info('apply incoming changes for LSN: ${base64.encode(lsn)}');
// assign timestamp to pending operations before apply
await mutexSnapshot();
firstDMLChunk = false;
}
final applyRes = await apply(entries, origin);
final statements = applyRes.statements;
final tablenames = applyRes.tableNames;
for (final e in entries) {
opLogEntries.add(e);
}
for (final s in statements) {
stmts.add(s);
}
for (final n in tablenames) {
tablenamesSet.add(n);
}
}
Future<void> processDDL(List<SchemaChange> changes) async {
final createdTables = <String>{};
final affectedTables = <String, MigrationTable>{};
for (final change in changes) {
stmts.add(Statement(change.sql));
if (change.migrationType == SatOpMigrate_Type.CREATE_TABLE ||
change.migrationType == SatOpMigrate_Type.ALTER_ADD_COLUMN) {
// We will create/update triggers for this new/updated table
// so store it in `tablenamesSet` such that those
// triggers can be disabled while executing the transaction
final affectedTable =
QualifiedTablename(namespace, change.table.name).toString();
// store the table information to generate the triggers after this `forEach`
affectedTables[affectedTable] = change.table;
tablenamesSet.add(affectedTable);
if (change.migrationType == SatOpMigrate_Type.CREATE_TABLE) {
createdTables.add(affectedTable);
}
}
}
// Also add statements to create the necessary triggers for the created/updated table
for (final table in affectedTables.values) {
final triggers = generateTriggersForTable(table, builder);
stmts.addAll(triggers);
txStmts.addAll(triggers);
}
// Disable the newly created triggers
// during the processing of this transaction
final createdQualifiedTables =
createdTables.map(QualifiedTablename.parse).toList();
stmts.addAll(_disableTriggers(createdQualifiedTables));
newTables = <String>{...newTables, ...createdTables};
}
// Start with garbage collection, because if this a transaction after round-trip, then we don't want it in conflict resolution
await maybeGarbageCollect(origin, commitTimestamp);
// Chunk incoming changes by their types, and process each chunk one by one
for (final (dataChange, chunk) in chunkBy(
transaction.changes,
(c, _, __) => c is DataChange,
)) {
if (dataChange) {
await processDML(chunk.cast<DataChange>());
} else {
await processDDL(chunk.cast<SchemaChange>());
}
}
// Now run the DML and DDL statements in-order in a transaction
final tablenames = tablenamesSet.toList();
final qualifiedTables = tablenames.map(QualifiedTablename.parse).toList();
final notNewTableNames =
tablenames.where((t) => !newTables.contains(t)).toList();
final notNewQualifiedTables =
notNewTableNames.map(QualifiedTablename.parse).toList();
final allStatements = [
..._disableTriggers(notNewQualifiedTables),
...stmts,
..._enableTriggers(qualifiedTables),
];
if (transaction.migrationVersion != null) {
// If a migration version is specified
// then the transaction is a migration
await migrator.applyIfNotAlready(
StmtMigration(
statements: allStatements,
version: transaction.migrationVersion!,
),
);
} else {
await adapter.runInTransaction(allStatements);
}
_notifyChanges(opLogEntries, ChangeOrigin.remote);
}