waitForSignal method
Waits for a signal to be delivered to a workflow execution.
Checks for already-delivered (PENDING) signals first. If found, consumes it immediately. Otherwise, creates a PENDING signal record and suspends execution until deliverSignal is called.
If timeout is provided, the signal will be marked EXPIRED and a
TimeoutException will be thrown after the duration elapses.
Implementation
Future<Object?> waitForSignal({
required String workflowExecutionId,
required String signalName,
Duration? timeout,
}) async {
// Check for already-arrived signal (pre-sent before waitSignal was called)
final existingSignals = await _store.loadPendingSignals(
workflowExecutionId,
signalName: signalName,
);
if (existingSignals.isNotEmpty) {
final signal = existingSignals.first;
await _store.saveSignal(
signal.copyWith(status: SignalStatus.delivered),
);
return signal.payload;
}
// No signal yet — create a PENDING record and suspend
final now = utcNow();
final pendingSignal = WorkflowSignal(
workflowExecutionId: workflowExecutionId,
signalName: signalName,
status: SignalStatus.pending,
createdAt: now,
);
await _store.saveSignal(pendingSignal);
final execution = await _store.loadExecution(workflowExecutionId);
if (execution != null) {
await _store.saveExecution(
execution.copyWith(
status: const Suspended(),
updatedAt: utcNow(),
),
);
}
final key = _compositeKey(workflowExecutionId, signalName);
final completer = Completer<Object?>();
_completers[key] = completer;
// Set up timeout if specified
if (timeout != null) {
_timeoutTimers[key] = Timer(timeout, () async {
// Guard: if deliverSignal() already completed this completer,
// or if it was removed from the map, skip timeout handling.
if (completer.isCompleted || !_completers.containsKey(key)) {
_timeoutTimers.remove(key);
return;
}
// Mark signal as EXPIRED
final signals = await _store.loadPendingSignals(
workflowExecutionId,
signalName: signalName,
);
for (final s in signals) {
await _store.saveSignal(
s.copyWith(status: SignalStatus.expired),
);
}
_completers.remove(key);
_timeoutTimers.remove(key);
if (!completer.isCompleted) {
completer.completeError(
WorkflowTimeoutException(
workflowExecutionId: workflowExecutionId,
signalName: signalName,
timeout: timeout,
),
);
}
});
}
return completer.future;
}