suspendOnTopic method
Suspends runId while awaiting an event with topic.
If deadline is provided, the run should be considered due at that time
even if an event is never received.
Implementation
@override
Future<void> suspendOnTopic(
String runId,
String stepName,
String topic, {
DateTime? deadline,
Map<String, Object?>? data,
}) async {
final now = _clock.now().toUtc();
final metadata = _prepareSuspensionData(
data,
resumeAt: deadline,
deadline: deadline,
topic: topic,
);
await _connections.runInTransaction((ctx) async {
final run = await ctx
.query<StemWorkflowRun>()
.whereEquals('id', runId)
.whereEquals('namespace', namespace)
.first();
if (run != null) {
final updates = StemWorkflowRunUpdateDto(
status: WorkflowStatus.suspended.name,
waitTopic: topic,
resumeAt: deadline,
suspensionData: jsonEncode(metadata),
updatedAt: now,
).toMap();
updates['resume_at'] = deadline;
await ctx.repository<StemWorkflowRun>().update(
updates,
where: StemWorkflowRunPartial(id: runId, namespace: namespace),
);
}
});
}