saveCheckpoint method
Persists a step checkpoint.
If a checkpoint with the same (workflowExecutionId, stepIndex, attempt) already exists, it should be updated.
Implementation
@override
Future<void> saveCheckpoint(StepCheckpoint checkpoint) async {
final key = checkpoint.workflowExecutionId;
final list = _checkpoints.putIfAbsent(key, () => []);
// Update existing checkpoint with same (executionId, stepIndex, attempt)
final existingIndex = list.indexWhere(
(cp) =>
cp.stepIndex == checkpoint.stepIndex &&
cp.attempt == checkpoint.attempt,
);
final saved = checkpoint.id != null
? checkpoint
: checkpoint.copyWith(id: _nextCheckpointId++);
if (existingIndex >= 0) {
list[existingIndex] = saved;
} else {
list.add(saved);
}
}