range static method
Take a range of bytes from the source
stream.
The begin
of the range is inclusive and default to 0 which will start at the beginning of the source stream.
The end
of the stream is exclusive, and if a null value is provided, the taking will walk forward until reaches the end of the source or when the subscription is cancelled.
Implementation
static Stream<List<int>> range(Stream<List<int>> source, {int begin = 0, int? end}) {
StreamController<List<int>> controller = StreamController();
late StreamSubscription<List<int>> subscription;
bool passThrough = begin == 0 && end == null;
int walked = 0;
bool finished = false;
int transferred = 0;
int? expected = end == null ? null : end - begin;
List<int>? transformData(List<int> data) {
if (passThrough) {
return data;
}
int blockBegin = walked;
int blockEnd = walked + data.length;
int lastWalked = walked;
walked += data.length;
if (blockEnd <= begin) {
// still far away
return null;
}
if (end != null && blockBegin >= end) {
// received all data in range
finished = true;
controller.close().then((value) => subscription.cancel());
return null;
}
if (blockBegin < begin) {
blockBegin = begin;
}
if (end == null) {
return data.sublist(blockBegin - lastWalked);
}
if (blockEnd > end) {
blockEnd = end;
}
return blockEnd - blockBegin == data.length ? data : data.sublist(blockBegin - lastWalked, blockEnd - lastWalked);
}
controller.onListen = () {
subscription = source.listen((List<int> event) {
// print('---- onData');
if (finished) {
return;
}
List<int>? targetData = transformData(event);
if (targetData == null) {
return;
}
if (controller.isClosed) {
return;
}
controller.add(targetData);
transferred += targetData.length;
if (expected != null && transferred > expected) {
log('ByteRangeStream transfer exceeded, expected:$expected, transferred:$transferred');
}
}, onDone: () {
// print('ByteRangeStream transferred:$transferred');
controller.close();
}, onError: (Object e, StackTrace s) {
// print('ByteRangeStream transferred:$transferred');
controller.addError(e, s);
});
};
controller.onResume = () {
// print('---- onResume');
subscription.resume();
};
controller.onPause = () {
// print('---- onPause');
subscription.pause();
};
controller.onCancel = () async {
// print('---- onCancel');
unawaited(controller.close());
return subscription.cancel();
};
return controller.stream;
}