rateLimit method
Limits the rate of event emissions, allowing at most maxEvents
during each duration window.
Example:
myStream.rateLimit(5, Duration(seconds: 1))
.listen((data) => print(data));
Implementation
Stream<T> rateLimit(int maxEvents, Duration duration) {
if (maxEvents <= 0) {
throw ArgumentError('maxEvents must be greater than zero');
}
if (duration.inMilliseconds <= 0) {
throw ArgumentError('Duration must be greater than zero');
}
var eventCount = 0;
var windowStart = DateTime.now();
final controller = StreamController<T>();
late StreamSubscription<T> subscription;
void resetCounter() {
eventCount = 0;
windowStart = DateTime.now();
}
subscription = listen(
(data) {
final now = DateTime.now();
if (now.difference(windowStart) >= duration) {
resetCounter();
}
if (eventCount < maxEvents) {
eventCount++;
controller.add(data);
}
},
onError: (dynamic error, dynamic stackTrace) {
controller._handleDynamicOnError(error, stackTrace);
},
onDone: controller.close,
cancelOnError: false,
);
controller.onCancel = () async {
await subscription.cancel();
};
return controller.stream;
}