throttle method

Stream<T> throttle(
  1. Duration duration
)

Emits the first event of each duration window and suppresses the rest (throttle).

Implementation

Stream<T> throttle(Duration duration) {
  if (duration <= Duration.zero) {
    throw ArgumentError.value(duration, 'duration', 'must be greater than zero');
  }

  late StreamController<T> controller;
  StreamSubscription<T>? subscription;
  Timer? timer;
  var suppressed = false;

  void listenToSource() {
    subscription = listen(
      (value) {
        if (suppressed) return;
        controller.add(value);
        suppressed = true;
        timer?.cancel();
        timer = Timer(duration, () => suppressed = false);
      },
      onError: controller.addError,
      onDone: () {
        timer?.cancel();
        unawaited(controller.close());
      },
    );
  }

  Future<void> cancel() async {
    timer?.cancel();
    await subscription?.cancel();
  }

  controller =
      isBroadcast
          ? StreamController<T>.broadcast(onListen: listenToSource, onCancel: cancel)
          : StreamController<T>(onListen: listenToSource, onCancel: cancel);
  return controller.stream;
}