runInXaTransaction<T extends Object> method
Future<Result<T> >
runInXaTransaction<T extends Object>(})
override
Runs action inside a distributed XA / 2PC branch on connectionId.
Two-phase (default): xa_start → action → xa_end → xa_prepare →
xa_commit_prepared. Set onePhase to use xa_commit_one_phase after
action instead (single-RM shortcut only).
action returning Failure triggers best-effort rollback; thrown
exceptions are converted to QueryError and also roll back, matching
runInTransaction.
Implementation
@override
Future<Result<T>> runInXaTransaction<T extends Object>(
String connectionId,
Xid xid,
Future<Result<T>> Function(XaTransactionHandle xa) action, {
bool onePhase = false,
}) async {
final startResult = await _repository.xaStart(connectionId, xid);
if (startResult.isError()) {
return Failure(startResult.exceptionOrNull()!);
}
final xa = startResult.getOrNull()!;
if (onePhase) {
try {
final userResult = await action(xa);
if (userResult.isError()) {
await _xaSafelyAbort(xa);
return userResult;
}
if (!xa.commitOnePhase()) {
return Failure(
QueryError(
message: 'runInXaTransaction: xa_commit_one_phase failed '
'on xid=${xa.xid}',
),
);
}
return userResult;
} on Object catch (e, st) {
await _xaSafelyAbort(xa);
return Failure(
QueryError(
message: 'runInXaTransaction: action threw ${e.runtimeType}: '
'$e\n$st',
),
);
}
}
try {
final userResult = await action(xa);
if (userResult.isError()) {
await _xaSafelyAbort(xa);
return userResult;
}
if (!xa.end()) {
return Failure(
QueryError(
message: 'runInXaTransaction: xa_end failed on xid=${xa.xid}',
),
);
}
if (!xa.prepare()) {
return Failure(
QueryError(
message: 'runInXaTransaction: xa_prepare failed on xid=${xa.xid}',
),
);
}
if (!xa.commitPrepared()) {
return Failure(
QueryError(
message: 'runInXaTransaction: xa_commit_prepared failed '
'on xid=${xa.xid}',
),
);
}
return userResult;
} on Object catch (e, st) {
await _xaSafelyAbort(xa);
return Failure(
QueryError(
message: 'runInXaTransaction: action threw ${e.runtimeType}: '
'$e\n$st',
),
);
}
}