audit method

Stream<T> audit(
  1. Duration duration
)

Audit a single event from each duration length period where there are events on this stream.

No events will ever be emitted within duration of another event on the result stream. If this stream is a broadcast stream, the result will be as well. Errors are forwarded immediately.

The first event will begin the audit period. At the end of the audit period the most recent event is emitted, and the next event restarts the audit period.

If the event that started the period is the one that is emitted it will be delayed by duration. If a later event comes in within the period it's delay will be shorter by the difference in arrival times.

If there is no pending event when this stream closes the output stream will close immediately. If there is a pending event the output stream will wait to emit it before closing.

For example:

source.audit(Duration(seconds: 5));

source: a------b--c----d--|
output: -----a------c--------d|

See also:

  • throttle, which emits the first event during the window, instead of the last event in the window. Compared to throttle, audit will introduce delay to forwarded events.
  • debounce, which only emits after the stream has not emitted for some period. Compared to debouce, audit cannot be starved by having events emitted continuously within duration.

Implementation

Stream<T> audit(Duration duration) {
  Timer? timer;
  var shouldClose = false;
  T recentData;

  return transformByHandlers(onData: (data, sink) {
    recentData = data;
    timer ??= Timer(duration, () {
      sink.add(recentData);
      timer = null;
      if (shouldClose) {
        sink.close();
      }
    });
  }, onDone: (sink) {
    if (timer != null) {
      shouldClose = true;
    } else {
      sink.close();
    }
  });
}