waitForSignal method

Future<Object?> waitForSignal({
  1. required String workflowExecutionId,
  2. required String signalName,
  3. Duration? timeout,
})

Waits for a signal to be delivered to a workflow execution.

Checks for already-delivered (PENDING) signals first. If found, consumes it immediately. Otherwise, creates a PENDING signal record and suspends execution until deliverSignal is called.

If timeout is provided, the signal will be marked EXPIRED and a TimeoutException will be thrown after the duration elapses.

Implementation

Future<Object?> waitForSignal({
  required String workflowExecutionId,
  required String signalName,
  Duration? timeout,
}) async {
  // Check for already-arrived signal (pre-sent before waitSignal was called)
  final existingSignals = await _store.loadPendingSignals(
    workflowExecutionId,
    signalName: signalName,
  );
  if (existingSignals.isNotEmpty) {
    final signal = existingSignals.first;
    await _store.saveSignal(
      signal.copyWith(status: SignalStatus.delivered),
    );
    return signal.payload;
  }

  // No signal yet — create a PENDING record and suspend
  final now = utcNow();
  final pendingSignal = WorkflowSignal(
    workflowExecutionId: workflowExecutionId,
    signalName: signalName,
    status: SignalStatus.pending,
    createdAt: now,
  );
  await _store.saveSignal(pendingSignal);

  final execution = await _store.loadExecution(workflowExecutionId);
  if (execution != null) {
    await _store.saveExecution(
      execution.copyWith(
        status: const Suspended(),
        updatedAt: utcNow(),
      ),
    );
  }

  final key = _compositeKey(workflowExecutionId, signalName);
  final completer = Completer<Object?>();
  _completers[key] = completer;

  // Set up timeout if specified
  if (timeout != null) {
    _timeoutTimers[key] = Timer(timeout, () async {
      // Guard: if deliverSignal() already completed this completer,
      // or if it was removed from the map, skip timeout handling.
      if (completer.isCompleted || !_completers.containsKey(key)) {
        _timeoutTimers.remove(key);
        return;
      }
      // Mark signal as EXPIRED
      final signals = await _store.loadPendingSignals(
        workflowExecutionId,
        signalName: signalName,
      );
      for (final s in signals) {
        await _store.saveSignal(
          s.copyWith(status: SignalStatus.expired),
        );
      }
      _completers.remove(key);
      _timeoutTimers.remove(key);
      if (!completer.isCompleted) {
        completer.completeError(
          WorkflowTimeoutException(
            workflowExecutionId: workflowExecutionId,
            signalName: signalName,
            timeout: timeout,
          ),
        );
      }
    });
  }

  return completer.future;
}