range static method

Stream<List<int>> range(
  1. Stream<List<int>> source, {
  2. int begin = 0,
  3. int? end,
})

Take a range of bytes from the source stream.

The begin of the range is inclusive and default to 0 which will start at the beginning of the source stream.

The end of the stream is exclusive, and if a null value is provided, the taking will walk forward until reaches the end of the source or when the subscription is cancelled.

Implementation

static Stream<List<int>> range(Stream<List<int>> source, {int begin = 0, int? end}) {
  StreamController<List<int>> controller = StreamController();
  late StreamSubscription<List<int>> subscription;
  bool passThrough = begin == 0 && end == null;
  int walked = 0;
  bool finished = false;
  int transferred = 0;
  int? expected = end == null ? null : end - begin;

  List<int>? transformData(List<int> data) {
    if (passThrough) {
      return data;
    }
    int blockBegin = walked;
    int blockEnd = walked + data.length;
    int lastWalked = walked;
    walked += data.length;
    if (blockEnd <= begin) {
      // still far away
      return null;
    }
    if (end != null && blockBegin >= end) {
      // received all data in range
      finished = true;
      controller.close().then((value) => subscription.cancel());
      return null;
    }
    if (blockBegin < begin) {
      blockBegin = begin;
    }
    if (end == null) {
      return data.sublist(blockBegin - lastWalked);
    }
    if (blockEnd > end) {
      blockEnd = end;
    }
    return blockEnd - blockBegin == data.length ? data : data.sublist(blockBegin - lastWalked, blockEnd - lastWalked);
  }

  controller.onListen = () {
    subscription = source.listen((List<int> event) {
      // print('---- onData');
      if (finished) {
        return;
      }
      List<int>? targetData = transformData(event);
      if (targetData == null) {
        return;
      }
      if (controller.isClosed) {
        return;
      }
      controller.add(targetData);
      transferred += targetData.length;
      if (expected != null && transferred > expected) {
        log('ByteRangeStream transfer exceeded, expected:$expected, transferred:$transferred');
      }
    }, onDone: () {
      // print('ByteRangeStream transferred:$transferred');
      controller.close();
    }, onError: (Object e, StackTrace s) {
      // print('ByteRangeStream transferred:$transferred');
      controller.addError(e, s);
    });
  };
  controller.onResume = () {
    // print('---- onResume');
    subscription.resume();
  };
  controller.onPause = () {
    // print('---- onPause');
    subscription.pause();
  };
  controller.onCancel = () async {
    // print('---- onCancel');
    unawaited(controller.close());
    return subscription.cancel();
  };
  return controller.stream;
}