checkTaskExecutionStatusAsStream<T> method

Stream<AddonExecutionStatus<T>> checkTaskExecutionStatusAsStream<T>({
  1. required String requestId,
  2. required Future<AddonExecutionStatus<T>> task(
    1. String requestId
    ),
  3. Duration checkInterval = const Duration(seconds: 5),
})

Implementation

Stream<AddonExecutionStatus<T>> checkTaskExecutionStatusAsStream<T>({
  required String requestId,
  required Future<AddonExecutionStatus<T>> Function(String requestId) task,
  Duration checkInterval = const Duration(seconds: 5),
}) {
  Future<void> checker(
      StreamController<AddonExecutionStatus<T>> controller) async {
    late AddonExecutionStatus<T> status;

    try {
      status = await task(requestId);
    } catch (e) {
      controller.addError(e);
      controller.close();
    }

    controller.add(status);

    if (status.status == AddonExecutionStatusValue.InProgress) {
      Timer(checkInterval, () => checker(controller));

      return;
    }

    controller.close();
  }

  final StreamController<AddonExecutionStatus<T>> controller =
      StreamController.broadcast();

  checker(controller);

  return controller.stream;
}