executeStep<T> method

Future<T> executeStep<T>(
  1. String name,
  2. Future<T> action(), {
  3. String serialize(
    1. T value
    )?,
  4. T deserialize(
    1. String data
    )?,
  5. RetryPolicy retryPolicy = RetryPolicy.none,
  6. Future<void> compensate(
    1. T result
    )?,
})

Executes a single step with checkpoint/resume semantics and retry support.

If a COMPLETED checkpoint exists for this step, returns the cached output. Otherwise records INTENT, executes action, and records COMPLETED. On failure, retries according to retryPolicy before recording FAILED.

If compensate is provided, it is registered for saga rollback.

The serialize and deserialize functions handle converting the step result to/from a JSON-encodable string for persistence.

Implementation

Future<T> executeStep<T>(
  String name,
  Future<T> Function() action, {
  String Function(T value)? serialize,
  T Function(String data)? deserialize,
  RetryPolicy retryPolicy = RetryPolicy.none,
  Future<void> Function(T result)? compensate,
}) async {
  if (_cancelled) {
    throw WorkflowCancelledException(_workflowExecutionId);
  }

  final currentIndex = _stepIndex++;
  final now = utcNow();

  // Check for existing COMPLETED checkpoint (replay)
  final existing = _findCompletedCheckpoint(currentIndex);
  if (existing != null) {
    // Detect dynamic step name mismatch
    if (existing.stepName != name) {
      onStepNameMismatch(
        _workflowExecutionId,
        currentIndex,
        existing.stepName,
        name,
      );
    }

    // Deserialize the cached result for replay
    T replayResult;
    if (existing.outputData != null && deserialize != null) {
      replayResult = deserialize(existing.outputData!);
    } else if (existing.outputData != null) {
      final decoded = jsonDecode(existing.outputData!);
      replayResult = decoded as T;
    } else {
      replayResult = null as T;
    }

    // Register compensate under the checkpointed name so that
    // SagaCompensator (which looks up by checkpoint stepName) can find it.
    if (compensate != null) {
      compensateFunctions[existing.stepName] =
          (dynamic v) => compensate(v as T);
      compensateResults[existing.stepName] = replayResult;
    }

    return replayResult;
  }

  // Register compensate function if provided (non-replay path).
  // The result will be stored after execution succeeds.
  Future<void> Function(dynamic)? wrappedCompensate;
  if (compensate != null) {
    wrappedCompensate = (dynamic v) => compensate(v as T);
  }

  final intentCheckpoint = StepCheckpoint(
    workflowExecutionId: _workflowExecutionId,
    stepIndex: currentIndex,
    stepName: name,
    status: StepStatus.intent,
    compensateRef: compensate != null ? name : null,
    startedAt: now,
  );
  await _store.saveCheckpoint(intentCheckpoint);

  try {
    final result = await _retryExecutor.executeWithRetry<T>(
      retryPolicy,
      action,
      delayFn: delayFn,
      onRetry: (attempt, error) async {
        // Record FAILED checkpoint for this attempt
        final failedCheckpoint = intentCheckpoint.copyWith(
          status: StepStatus.failed,
          attempt: attempt,
          errorMessage: error.toString(),
          completedAt: utcNow(),
        );
        await _store.saveCheckpoint(failedCheckpoint);
      },
    );

    // Register compensate with the result now that execution succeeded
    if (wrappedCompensate != null) {
      compensateFunctions[name] = wrappedCompensate;
      compensateResults[name] = result;
    }

    String? outputData;
    if (serialize != null) {
      outputData = serialize(result);
    } else if (result != null) {
      outputData = jsonEncode(result);
    }

    final completedCheckpoint = intentCheckpoint.copyWith(
      status: StepStatus.completed,
      outputData: outputData,
      compensateRef: compensate != null ? name : null,
      completedAt: utcNow(),
    );
    await _store.saveCheckpoint(completedCheckpoint);

    final execution = await _store.loadExecution(_workflowExecutionId);
    if (execution != null) {
      await _store.saveExecution(
        execution.copyWith(
          currentStep: currentIndex,
          updatedAt: utcNow(),
        ),
      );
    }

    return result;
  } catch (e) {
    if (e is WorkflowCancelledException) rethrow;

    final maxAttempt = switch (retryPolicy) {
      RetryPolicyNone() => 1,
      RetryPolicyFixed(maxAttempts: final max) => max,
      RetryPolicyExponential(maxAttempts: final max) => max,
    };
    final failedCheckpoint = intentCheckpoint.copyWith(
      status: StepStatus.failed,
      attempt: maxAttempt,
      errorMessage: e.toString(),
      completedAt: utcNow(),
    );
    await _store.saveCheckpoint(failedCheckpoint);

    final execution = await _store.loadExecution(_workflowExecutionId);
    if (execution != null) {
      await _store.saveExecution(
        execution.copyWith(
          status: const Failed(),
          errorMessage: e.toString(),
          updatedAt: utcNow(),
        ),
      );
    }

    rethrow;
  }
}