periodic<T extends Object> method

Stream<T> periodic<T extends Object>(
  1. FutureOr<T> callback(),
  2. Duration interval
)

Creates a Stream that periodically executes a callback function.

The stream emits the result of the callback at each interval. The polling starts as soon as the stream is listened to and pauses/resumes or cancels automatically with the stream subscription.

This is useful for polling a resource, such as an API endpoint, to get periodic updates.

{@tool snippet}

int counter = 0;
Future<String> fetchData() async {
  await Future.delayed(Duration(milliseconds: 50));
  return 'Data packet #${++counter}';
}

final poller = StreamHelper.i.periodic(fetchData, Duration(seconds: 1));

final subscription = poller.listen(print);

// After 3 seconds, it will have printed 'Data packet #1', '#2', '#3'
await Future.delayed(Duration(seconds: 3, milliseconds: 100));
subscription.cancel();

{@end-tool}

Implementation

Stream<T> periodic<T extends Object>(
  FutureOr<T> Function() callback,
  Duration interval,
) {
  final controller = StreamController<T>();
  Timer? timer;
  void poll() {
    if (controller.isClosed) return;
    consec(
      callback(),
      (value) {
        if (!controller.isClosed) {
          controller.add(value);
        }
      },
      onError: (e, s) {
        if (!controller.isClosed) {
          controller.addError(e, s);
        }
      },
    );
  }

  void startTimer() {
    poll();
    timer = Timer.periodic(interval, (_) => poll());
  }

  void stopTimer() {
    timer?.cancel();
    timer = null;
  }

  controller
    ..onListen = startTimer
    ..onPause = stopTimer
    ..onResume = startTimer
    ..onCancel = stopTimer;
  return controller.stream;
}