runCheckpointStoreContractTests function

void runCheckpointStoreContractTests(
  1. CheckpointStore createStore()
)

Runs the full contract test suite against a CheckpointStore implementation.

createStore must return a fresh, empty store for each test.

Implementation

void runCheckpointStoreContractTests(
  CheckpointStore Function() createStore,
) {
  late CheckpointStore store;

  setUp(() {
    store = createStore();
  });

  group('CheckpointStore contract', () {
    group('Execution', () {
      test('saveExecution then loadExecution returns same data', () async {
        final exec = createTestExecution('exec-1');
        await store.saveExecution(exec);

        final loaded = await store.loadExecution('exec-1');
        expect(loaded, isNotNull);
        expect(loaded!.workflowExecutionId, equals('exec-1'));
        expect(loaded.status, isA<Running>());
        expect(loaded.currentStep, equals(0));
      });

      test('loadExecution returns null for non-existent ID', () async {
        final loaded = await store.loadExecution('non-existent');
        expect(loaded, isNull);
      });

      test('saveExecution updates existing execution', () async {
        final exec = createTestExecution('exec-1');
        await store.saveExecution(exec);

        final updated = exec.copyWith(
          status: const Completed(),
          currentStep: 3,
          updatedAt: nowTimestamp(),
        );
        await store.saveExecution(updated);

        final loaded = await store.loadExecution('exec-1');
        expect(loaded, isNotNull);
        expect(loaded!.status, isA<Completed>());
        expect(loaded.currentStep, equals(3));
      });

      test('loadExecutionsByStatus filters correctly', () async {
        await store.saveExecution(createTestExecution('exec-1',
            status: const Running()));
        await store.saveExecution(createTestExecution('exec-2',
            status: const Completed()));
        await store.saveExecution(createTestExecution('exec-3',
            status: const Running()));
        await store.saveExecution(createTestExecution('exec-4',
            status: const Failed()));

        final running = await store.loadExecutionsByStatus(
          [const Running()],
        );
        expect(running, hasLength(2));
        expect(
          running.map((e) => e.workflowExecutionId).toList()..sort(),
          equals(['exec-1', 'exec-3']),
        );
      });

      test('loadExecutionsByStatus with multiple statuses', () async {
        await store.saveExecution(createTestExecution('exec-1',
            status: const Running()));
        await store.saveExecution(createTestExecution('exec-2',
            status: const Completed()));
        await store.saveExecution(createTestExecution('exec-3',
            status: const Failed()));

        final results = await store.loadExecutionsByStatus(
          [const Running(), const Failed()],
        );
        expect(results, hasLength(2));
      });

      test('loadExecutionsByStatus returns empty for no matches', () async {
        await store.saveExecution(createTestExecution('exec-1',
            status: const Running()));

        final results = await store.loadExecutionsByStatus(
          [const Completed()],
        );
        expect(results, isEmpty);
      });
    });

    group('Checkpoint', () {
      test('saveCheckpoint then loadCheckpoints returns data', () async {
        await store.saveExecution(createTestExecution('exec-1'));

        final cp = createTestCheckpoint(
          workflowExecutionId: 'exec-1',
          stepIndex: 0,
          stepName: 'validate',
        );
        await store.saveCheckpoint(cp);

        final loaded = await store.loadCheckpoints('exec-1');
        expect(loaded, hasLength(1));
        expect(loaded.first.stepName, equals('validate'));
        expect(loaded.first.status, equals(StepStatus.completed));
      });

      test('loadCheckpoints returns ordered by stepIndex', () async {
        await store.saveExecution(createTestExecution('exec-1'));

        await store.saveCheckpoint(createTestCheckpoint(
          workflowExecutionId: 'exec-1', stepIndex: 2, stepName: 'step-2',
        ));
        await store.saveCheckpoint(createTestCheckpoint(
          workflowExecutionId: 'exec-1', stepIndex: 0, stepName: 'step-0',
        ));
        await store.saveCheckpoint(createTestCheckpoint(
          workflowExecutionId: 'exec-1', stepIndex: 1, stepName: 'step-1',
        ));

        final loaded = await store.loadCheckpoints('exec-1');
        expect(loaded, hasLength(3));
        expect(loaded[0].stepIndex, equals(0));
        expect(loaded[1].stepIndex, equals(1));
        expect(loaded[2].stepIndex, equals(2));
      });

      test('loadCheckpoints returns empty for non-existent execution',
          () async {
        final loaded = await store.loadCheckpoints('non-existent');
        expect(loaded, isEmpty);
      });

      test('checkpoints are scoped to execution', () async {
        await store.saveExecution(createTestExecution('exec-1'));
        await store.saveExecution(createTestExecution('exec-2'));

        await store.saveCheckpoint(createTestCheckpoint(
          workflowExecutionId: 'exec-1', stepIndex: 0, stepName: 'a',
        ));
        await store.saveCheckpoint(createTestCheckpoint(
          workflowExecutionId: 'exec-2', stepIndex: 0, stepName: 'b',
        ));

        final loaded1 = await store.loadCheckpoints('exec-1');
        final loaded2 = await store.loadCheckpoints('exec-2');
        expect(loaded1, hasLength(1));
        expect(loaded1.first.stepName, equals('a'));
        expect(loaded2, hasLength(1));
        expect(loaded2.first.stepName, equals('b'));
      });
    });

    group('Timer', () {
      test('saveTimer then loadPendingTimers returns pending timers',
          () async {
        await store.saveExecution(createTestExecution('exec-1'));

        final timer = createTestTimer('timer-1', 'exec-1');
        await store.saveTimer(timer);

        final loaded = await store.loadPendingTimers();
        expect(loaded, isNotEmpty);
        expect(
          loaded.any((t) => t.workflowTimerId == 'timer-1'),
          isTrue,
        );
      });

      test('fired timers are not returned by loadPendingTimers', () async {
        await store.saveExecution(createTestExecution('exec-1'));

        final timer = createTestTimer('timer-1', 'exec-1');
        await store.saveTimer(timer);

        await store.saveTimer(
          timer.copyWith(status: TimerStatus.fired),
        );

        final loaded = await store.loadPendingTimers();
        expect(
          loaded.any((t) => t.workflowTimerId == 'timer-1'),
          isFalse,
        );
      });
    });

    group('Signal', () {
      test('saveSignal then loadPendingSignals returns signals', () async {
        await store.saveExecution(createTestExecution('exec-1'));

        final signal = createTestSignal('exec-1');
        await store.saveSignal(signal);

        final loaded = await store.loadPendingSignals('exec-1');
        expect(loaded, isNotEmpty);
        expect(loaded.first.signalName, equals('test-signal'));
      });

      test('loadPendingSignals filters by signalName', () async {
        await store.saveExecution(createTestExecution('exec-1'));

        await store.saveSignal(
          createTestSignal('exec-1', signalName: 'approve'),
        );
        await store.saveSignal(
          createTestSignal('exec-1', signalName: 'reject'),
        );

        final loaded = await store.loadPendingSignals(
          'exec-1',
          signalName: 'approve',
        );
        expect(loaded, hasLength(1));
        expect(loaded.first.signalName, equals('approve'));
      });

      test('loadPendingSignals returns empty for wrong execution', () async {
        await store.saveExecution(createTestExecution('exec-1'));
        await store.saveSignal(createTestSignal('exec-1'));

        final loaded = await store.loadPendingSignals('exec-other');
        expect(loaded, isEmpty);
      });

      test('delivered signals are not returned by loadPendingSignals',
          () async {
        await store.saveExecution(createTestExecution('exec-1'));

        await store.saveSignal(createTestSignal('exec-1'));

        // Load the saved signal to get its auto-assigned ID
        final pending = await store.loadPendingSignals('exec-1');
        expect(pending, hasLength(1));

        // Update with the assigned ID to trigger upsert
        await store.saveSignal(
          pending.first.copyWith(status: SignalStatus.delivered),
        );

        final loaded = await store.loadPendingSignals('exec-1');
        expect(loaded, isEmpty);
      });
    });

    group('Cleanup', () {
      test('deleteExecution removes execution and associated data', () async {
        await store.saveExecution(createTestExecution('exec-1'));
        await store.saveCheckpoint(createTestCheckpoint(
          workflowExecutionId: 'exec-1',
          stepIndex: 0,
          stepName: 'step-0',
        ));
        await store.saveTimer(createTestTimer('timer-1', 'exec-1'));
        await store.saveSignal(createTestSignal('exec-1'));

        await store.deleteExecution('exec-1');

        expect(await store.loadExecution('exec-1'), isNull);
        expect(await store.loadCheckpoints('exec-1'), isEmpty);
        expect(
          (await store.loadPendingTimers())
              .where((t) => t.workflowExecutionId == 'exec-1'),
          isEmpty,
        );
        expect(await store.loadPendingSignals('exec-1'), isEmpty);
      });

      test('deleteExecution does not affect other executions', () async {
        await store.saveExecution(createTestExecution('exec-1'));
        await store.saveExecution(createTestExecution('exec-2'));
        await store.saveCheckpoint(createTestCheckpoint(
          workflowExecutionId: 'exec-1',
          stepIndex: 0,
          stepName: 'a',
        ));
        await store.saveCheckpoint(createTestCheckpoint(
          workflowExecutionId: 'exec-2',
          stepIndex: 0,
          stepName: 'b',
        ));

        await store.deleteExecution('exec-1');

        expect(await store.loadExecution('exec-1'), isNull);
        expect(await store.loadExecution('exec-2'), isNotNull);
        expect(await store.loadCheckpoints('exec-2'), hasLength(1));
      });

      test('deleteCompletedBefore removes old completed executions', () async {
        final now = DateTime.now().toUtc();
        final old = now.subtract(const Duration(hours: 2));
        final recent = now.subtract(const Duration(minutes: 30));

        await store.saveExecution(createTestExecution(
          'exec-old',
          status: const Completed(),
        ).copyWith(updatedAt: old.toIso8601String()));
        await store.saveExecution(createTestExecution(
          'exec-recent',
          status: const Completed(),
        ).copyWith(updatedAt: recent.toIso8601String()));
        await store.saveExecution(createTestExecution(
          'exec-running',
          status: const Running(),
        ).copyWith(updatedAt: old.toIso8601String()));

        final cutoff = now.subtract(const Duration(hours: 1));
        final deleted = await store.deleteCompletedBefore(cutoff);

        expect(deleted, 1);
        expect(await store.loadExecution('exec-old'), isNull);
        expect(await store.loadExecution('exec-recent'), isNotNull);
        expect(await store.loadExecution('exec-running'), isNotNull);
      });
    });
  });
}