waitForSignal method

Future<Object?> waitForSignal({
  1. required String workflowExecutionId,
  2. required String signalName,
  3. Duration? timeout,
})

Waits for a signal to be delivered to a workflow execution.

Checks for already-delivered (PENDING) signals first. If found, consumes it immediately. Otherwise, creates a PENDING signal record and suspends execution until deliverSignal is called.

If timeout is provided, the signal will be marked EXPIRED and a TimeoutException will be thrown after the duration elapses.

Implementation

Future<Object?> waitForSignal({
  required String workflowExecutionId,
  required String signalName,
  Duration? timeout,
}) async {
  // Check for already-arrived signal (pre-sent before waitSignal was called)
  final existingSignals = await _store.loadPendingSignals(
    workflowExecutionId,
    signalName: signalName,
  );
  if (existingSignals.isNotEmpty) {
    final signal = existingSignals.first;
    await _store.saveSignal(
      signal.copyWith(status: SignalStatus.delivered),
    );
    return signal.payload;
  }

  // No signal yet — create a PENDING record and suspend
  final now = utcNow();
  final pendingSignal = WorkflowSignal(
    workflowExecutionId: workflowExecutionId,
    signalName: signalName,
    status: SignalStatus.pending,
    createdAt: now,
  );
  await _store.saveSignal(pendingSignal);

  final execution = await _store.loadExecution(workflowExecutionId);
  if (execution != null) {
    await _store.saveExecution(
      execution.copyWith(
        status: const Suspended(),
        updatedAt: utcNow(),
      ),
    );
  }

  final key = _compositeKey(workflowExecutionId, signalName);
  final completer = Completer<Object?>();
  _completers[key] = completer;

  // Set up timeout if specified
  if (timeout != null) {
    _timeoutTimers[key] = Timer(timeout, () async {
      if (!completer.isCompleted) {
        // Mark signal as EXPIRED
        final signals = await _store.loadPendingSignals(
          workflowExecutionId,
          signalName: signalName,
        );
        for (final s in signals) {
          await _store.saveSignal(
            s.copyWith(status: SignalStatus.expired),
          );
        }
        _completers.remove(key);
        _timeoutTimers.remove(key);
        completer.completeError(
          TimeoutException(
            'Signal "$signalName" timed out for execution '
            '$workflowExecutionId',
            timeout,
          ),
        );
      }
    });
  }

  return completer.future;
}