throttle method
Emits the first event of each duration window and suppresses the rest (throttle).
Implementation
Stream<T> throttle(Duration duration) {
if (duration <= Duration.zero) {
throw ArgumentError.value(duration, 'duration', 'must be greater than zero');
}
late StreamController<T> controller;
StreamSubscription<T>? subscription;
Timer? timer;
var suppressed = false;
void listenToSource() {
subscription = listen(
(value) {
if (suppressed) return;
controller.add(value);
suppressed = true;
timer?.cancel();
timer = Timer(duration, () => suppressed = false);
},
onError: controller.addError,
onDone: () {
timer?.cancel();
unawaited(controller.close());
},
);
}
Future<void> cancel() async {
timer?.cancel();
await subscription?.cancel();
}
controller =
isBroadcast
? StreamController<T>.broadcast(onListen: listenToSource, onCancel: cancel)
: StreamController<T>(onListen: listenToSource, onCancel: cancel);
return controller.stream;
}