applyTransaction method

  1. @visibleForTesting
Future<void> applyTransaction(
  1. Transaction transaction
)

Implementation

@visibleForTesting
Future<void> applyTransaction(Transaction transaction) async {
  final namespace = builder.defaultNamespace;
  final origin = transaction.origin!;

  final commitTimestamp = DateTime.fromMillisecondsSinceEpoch(
    transaction.commitTimestamp.toInt(),
  );

  // Transactions coming from the replication stream
  // may contain DML operations manipulating data
  // but may also contain DDL operations migrating schemas.
  // DML operations are ran through conflict resolution logic.
  // DDL operations are applied as is against the local DB.

  // `stmts` will store all SQL statements
  // that need to be executed
  final stmts = <Statement>[];
  // `txStmts` will store the statements related to the transaction
  // including the creation of triggers
  // but not statements that disable/enable the triggers
  // neither statements that update meta tables or modify pragmas.
  // The `txStmts` is used to compute the hash of migration transactions
  final txStmts = <Statement>[];
  final tablenamesSet = <String>{};
  var newTables = <String>{};
  final opLogEntries = <OplogEntry>[];
  final lsn = transaction.lsn;
  bool firstDMLChunk = true;

  // Defer (SQLite) or temporarily disable FK checks (Postgres)
  // because order of inserts may not respect referential integrity
  // and Postgres doesn't let us defer FKs
  // that were not originally defined as deferrable
  stmts.add(Statement(builder.deferOrDisableFKsForTx));

  // update lsn.
  stmts.add(updateLsnStmt(lsn));
  stmts.add(_resetAllSeenStmt());

  Future<void> processDML(List<DataChange> changes) async {
    final tx = DataTransaction(
      commitTimestamp: transaction.commitTimestamp,
      lsn: transaction.lsn,
      changes: changes,
    );
    final entries = fromTransaction(tx, relations, namespace);

    // Before applying DML statements we need to assign a timestamp to pending operations.
    // This only needs to be done once, even if there are several DML chunks
    // because all those chunks are part of the same transaction.
    if (firstDMLChunk) {
      logger.info('apply incoming changes for LSN: ${base64.encode(lsn)}');
      // assign timestamp to pending operations before apply
      await mutexSnapshot();
      firstDMLChunk = false;
    }

    final applyRes = await apply(entries, origin);
    final statements = applyRes.statements;
    final tablenames = applyRes.tableNames;
    for (final e in entries) {
      opLogEntries.add(e);
    }
    for (final s in statements) {
      stmts.add(s);
    }
    for (final n in tablenames) {
      tablenamesSet.add(n);
    }
  }

  Future<void> processDDL(List<SchemaChange> changes) async {
    final createdTables = <String>{};
    final affectedTables = <String, MigrationTable>{};
    for (final change in changes) {
      stmts.add(Statement(change.sql));

      if (change.migrationType == SatOpMigrate_Type.CREATE_TABLE ||
          change.migrationType == SatOpMigrate_Type.ALTER_ADD_COLUMN) {
        // We will create/update triggers for this new/updated table
        // so store it in `tablenamesSet` such that those
        // triggers can be disabled while executing the transaction
        final affectedTable =
            QualifiedTablename(namespace, change.table.name).toString();

        // store the table information to generate the triggers after this `forEach`
        affectedTables[affectedTable] = change.table;
        tablenamesSet.add(affectedTable);

        if (change.migrationType == SatOpMigrate_Type.CREATE_TABLE) {
          createdTables.add(affectedTable);
        }
      }
    }

    // Also add statements to create the necessary triggers for the created/updated table
    for (final table in affectedTables.values) {
      final triggers = generateTriggersForTable(table, builder);
      stmts.addAll(triggers);
      txStmts.addAll(triggers);
    }

    // Disable the newly created triggers
    // during the processing of this transaction
    final createdQualifiedTables =
        createdTables.map(QualifiedTablename.parse).toList();
    stmts.addAll(_disableTriggers(createdQualifiedTables));
    newTables = <String>{...newTables, ...createdTables};
  }

  // Start with garbage collection, because if this a transaction after round-trip, then we don't want it in conflict resolution
  await maybeGarbageCollect(origin, commitTimestamp);

  // Chunk incoming changes by their types, and process each chunk one by one
  for (final (dataChange, chunk) in chunkBy(
    transaction.changes,
    (c, _, __) => c is DataChange,
  )) {
    if (dataChange) {
      await processDML(chunk.cast<DataChange>());
    } else {
      await processDDL(chunk.cast<SchemaChange>());
    }
  }

  // Now run the DML and DDL statements in-order in a transaction
  final tablenames = tablenamesSet.toList();
  final qualifiedTables = tablenames.map(QualifiedTablename.parse).toList();
  final notNewTableNames =
      tablenames.where((t) => !newTables.contains(t)).toList();
  final notNewQualifiedTables =
      notNewTableNames.map(QualifiedTablename.parse).toList();

  final allStatements = [
    ..._disableTriggers(notNewQualifiedTables),
    ...stmts,
    ..._enableTriggers(qualifiedTables),
  ];

  if (transaction.migrationVersion != null) {
    // If a migration version is specified
    // then the transaction is a migration
    await migrator.applyIfNotAlready(
      StmtMigration(
        statements: allStatements,
        version: transaction.migrationVersion!,
      ),
    );
  } else {
    await adapter.runInTransaction(allStatements);
  }

  _notifyChanges(opLogEntries, ChangeOrigin.remote);
}