trackBytes function

Stream<List<int>> trackBytes(
  1. Stream<List<int>> source, {
  2. ProgressCallback? onProgress,
  3. Duration sample = const Duration(seconds: 1),
  4. ByteConverter? total,
  5. bool emitImmediate = true,
})

Wraps a byte stream and periodically reports throughput statistics.

Implementation

Stream<List<int>> trackBytes(
  Stream<List<int>> source, {
  ProgressCallback? onProgress,
  Duration sample = const Duration(seconds: 1),
  ByteConverter? total,
  bool emitImmediate = true,
}) {
  final controller = StreamController<List<int>>();
  var transferred = 0.0;
  var windowBytes = 0.0;
  final start = DateTime.now();
  Timer? timer;

  void emit() {
    final elapsed = DateTime.now().difference(start);
    final instBps = sample.inMicroseconds > 0
        ? (windowBytes * 8.0) / (sample.inMicroseconds / 1e6)
        : 0.0;
    final avgBps = elapsed.inMicroseconds > 0
        ? (transferred * 8.0) / (elapsed.inMicroseconds / 1e6)
        : 0.0;
    onProgress?.call(StreamProgress(
      transferred: ByteConverter(transferred),
      instantaneous: DataRate.bitsPerSecond(instBps),
      average: DataRate.bitsPerSecond(avgBps),
      elapsed: elapsed,
      total: total,
    ));
    windowBytes = 0.0;
  }

  source.listen((chunk) {
    controller.add(chunk);
    transferred += chunk.length;
    windowBytes += chunk.length;
  }, onDone: () {
    timer?.cancel();
    emit();
    controller.close();
  }, onError: controller.addError, cancelOnError: false);

  if (emitImmediate && onProgress != null) emit();
  timer = Timer.periodic(sample, (_) => emit());
  return controller.stream;
}