window method

Stream<List<T>> window(
  1. Duration windowDuration
)

Buffers events into time-based windows defined by windowDuration.

All events occurring within each time window are collected into a list, which is then emitted when the window expires.

Example:

myStream.window(Duration(seconds: 1))
  .listen((events) => print('Window contained ${events.length} items'));

Implementation

Stream<List<T>> window(Duration windowDuration) {
  if (windowDuration.inMilliseconds <= 0) {
    throw ArgumentError('Window duration must be greater than zero');
  }
  StreamController<List<T>>? controller;
  final buffer = <T>[];
  Timer? timer;

  void emitBuffer() {
    if (buffer.isNotEmpty) {
      controller?.add(List<T>.from(buffer));
      buffer.clear();
    }
  }

  controller = StreamController<List<T>>(
    onListen: () {
      timer = Timer.periodic(windowDuration, (_) => emitBuffer());
      final subscription = listen(
        buffer.add,
        onError: (dynamic error, dynamic stackTrace) {
          timer?.cancel();
          controller?._handleDynamicOnError(error, stackTrace);
        },
        onDone: () {
          timer?.cancel();
          emitBuffer();
          controller?.close();
        },
        cancelOnError: false,
      );
      controller?.onCancel = () async {
        timer?.cancel();
        await subscription.cancel();
      };
    },
  );

  return controller.stream;
}