rateLimit method

Stream<T> rateLimit(
  1. int maxEvents,
  2. Duration duration
)

Limits the rate of event emissions, allowing at most maxEvents during each duration window.

Example:

myStream.rateLimit(5, Duration(seconds: 1))
  .listen((data) => print(data));

Implementation

Stream<T> rateLimit(int maxEvents, Duration duration) {
  if (maxEvents <= 0) {
    throw ArgumentError('maxEvents must be greater than zero');
  }
  if (duration.inMilliseconds <= 0) {
    throw ArgumentError('Duration must be greater than zero');
  }

  var eventCount = 0;
  var windowStart = DateTime.now();
  final controller = StreamController<T>();
  late StreamSubscription<T> subscription;

  void resetCounter() {
    eventCount = 0;
    windowStart = DateTime.now();
  }

  subscription = listen(
    (data) {
      final now = DateTime.now();
      if (now.difference(windowStart) >= duration) {
        resetCounter();
      }
      if (eventCount < maxEvents) {
        eventCount++;
        controller.add(data);
      }
    },
    onError: (dynamic error, dynamic stackTrace) {
      controller._handleDynamicOnError(error, stackTrace);
    },
    onDone: controller.close,
    cancelOnError: false,
  );

  controller.onCancel = () async {
    await subscription.cancel();
  };

  return controller.stream;
}