claimRun method

  1. @override
Future<bool> claimRun(
  1. String runId, {
  2. required String ownerId,
  3. Duration leaseDuration = const Duration(seconds: 30),
})

Attempts to claim a run for execution with a lease.

Returns true when the claim succeeds, or false if another worker holds an active lease.

Implementation

@override
Future<bool> claimRun(
  String runId, {
  required String ownerId,
  Duration leaseDuration = const Duration(seconds: 30),
}) async {
  final now = _clock.now().toUtc();
  final leaseExpiresAt = now.add(leaseDuration);
  final updated = await _connections.runInTransaction((ctx) async {
    final query = ctx
        .query<StemWorkflowRun>()
        .whereEquals('id', runId)
        .whereEquals('namespace', namespace)
        .whereEquals('status', WorkflowStatus.running.name)
        .whereNull('waitTopic')
        .where((PredicateBuilder<StemWorkflowRun> q) {
          q
            ..whereNull('leaseExpiresAt')
            ..orWhere(
              'leaseExpiresAt',
              now,
              PredicateOperator.lessThanOrEqual,
            );
        });
    return query.update({
      'ownerId': ownerId,
      'leaseExpiresAt': leaseExpiresAt,
      'updatedAt': now,
    });
  });
  return updated > 0;
}