window method
Buffers events into time-based windows defined by windowDuration.
All events occurring within each time window are collected into a list, which is then emitted when the window expires.
Example:
myStream.window(Duration(seconds: 1))
.listen((events) => print('Window contained ${events.length} items'));
Implementation
Stream<List<T>> window(Duration windowDuration) {
if (windowDuration.inMilliseconds <= 0) {
throw ArgumentError('Window duration must be greater than zero');
}
StreamController<List<T>>? controller;
final buffer = <T>[];
Timer? timer;
void emitBuffer() {
if (buffer.isNotEmpty) {
controller?.add(List<T>.from(buffer));
buffer.clear();
}
}
controller = StreamController<List<T>>(
onListen: () {
timer = Timer.periodic(windowDuration, (_) => emitBuffer());
final subscription = listen(
buffer.add,
onError: (dynamic error, dynamic stackTrace) {
timer?.cancel();
controller?._handleDynamicOnError(error, stackTrace);
},
onDone: () {
timer?.cancel();
emitBuffer();
controller?.close();
},
cancelOnError: false,
);
controller?.onCancel = () async {
timer?.cancel();
await subscription.cancel();
};
},
);
return controller.stream;
}