runInXaTransaction<T extends Object> method

  1. @override
Future<Result<T>> runInXaTransaction<T extends Object>(
  1. String connectionId,
  2. Xid xid,
  3. Future<Result<T>> action(
    1. XaTransactionHandle xa
    ), {
  4. bool onePhase = false,
})
override

Runs action inside a distributed XA / 2PC branch on connectionId.

Two-phase (default): xa_startactionxa_endxa_preparexa_commit_prepared. Set onePhase to use xa_commit_one_phase after action instead (single-RM shortcut only).

action returning Failure triggers best-effort rollback; thrown exceptions are converted to QueryError and also roll back, matching runInTransaction.

Implementation

@override
Future<Result<T>> runInXaTransaction<T extends Object>(
  String connectionId,
  Xid xid,
  Future<Result<T>> Function(XaTransactionHandle xa) action, {
  bool onePhase = false,
}) async {
  final startResult = await _repository.xaStart(connectionId, xid);
  if (startResult.isError()) {
    return Failure(startResult.exceptionOrNull()!);
  }
  final xa = startResult.getOrNull()!;

  if (onePhase) {
    try {
      final userResult = await action(xa);
      if (userResult.isError()) {
        await _xaSafelyAbort(xa);
        return userResult;
      }
      if (!xa.commitOnePhase()) {
        return Failure(
          QueryError(
            message: 'runInXaTransaction: xa_commit_one_phase failed '
                'on xid=${xa.xid}',
          ),
        );
      }
      return userResult;
    } on Object catch (e, st) {
      await _xaSafelyAbort(xa);
      return Failure(
        QueryError(
          message: 'runInXaTransaction: action threw ${e.runtimeType}: '
              '$e\n$st',
        ),
      );
    }
  }

  try {
    final userResult = await action(xa);
    if (userResult.isError()) {
      await _xaSafelyAbort(xa);
      return userResult;
    }
    if (!xa.end()) {
      return Failure(
        QueryError(
          message: 'runInXaTransaction: xa_end failed on xid=${xa.xid}',
        ),
      );
    }
    if (!xa.prepare()) {
      return Failure(
        QueryError(
          message: 'runInXaTransaction: xa_prepare failed on xid=${xa.xid}',
        ),
      );
    }
    if (!xa.commitPrepared()) {
      return Failure(
        QueryError(
          message: 'runInXaTransaction: xa_commit_prepared failed '
              'on xid=${xa.xid}',
        ),
      );
    }
    return userResult;
  } on Object catch (e, st) {
    await _xaSafelyAbort(xa);
    return Failure(
      QueryError(
        message: 'runInXaTransaction: action threw ${e.runtimeType}: '
            '$e\n$st',
      ),
    );
  }
}