readStream method
Read next size
elements from chunked stream as a sub-stream.
This will pass-through chunks from the underlying chunked stream until
size
elements have been returned, or end-of-stream has been encountered.
If end-of-stream is encountered before size
elements is read, this
returns a list with fewer than size
elements (indicating end-of-stream).
If the underlying stream throws, the stream is cancelled, the exception is propogated and further read operations will fail.
If the sub-stream returned from readStream is cancelled the remaining
unread elements up-to size
are drained, allowing subsequent
read-operations to proceed after cancellation.
Throws, if another read-operation is on-going.
Implementation
Stream<List<T>> readStream(int size) {
RangeError.checkNotNegative(size, 'size');
if (_reading) {
throw StateError('Concurrent read operations are not allowed!');
}
_reading = true;
Stream<List<T>> substream() async* {
// While we have data to read
while (size > 0) {
// Read something into the buffer, if buffer has been consumed.
assert(_offset <= _buffer.length);
if (_offset == _buffer.length) {
if (!(await _input.moveNext())) {
// Don't attempt to read more data, as there is no more data.
size = 0;
_reading = false;
break;
}
_buffer = _input.current;
_offset = 0;
}
final remainingBuffer = _buffer.length - _offset;
if (remainingBuffer > 0) {
if (remainingBuffer >= size) {
List<T> output;
if (_buffer is Uint8List) {
output = Uint8List.sublistView(
_buffer as Uint8List, _offset, _offset + size) as List<T>;
} else {
output = _buffer.sublist(_offset, _offset + size);
}
_offset += size;
size = 0;
yield output;
_reading = false;
break;
}
final output = _offset == 0 ? _buffer : _buffer.sublist(_offset);
size -= remainingBuffer;
_buffer = _emptyList;
_offset = 0;
yield output;
}
}
}
final c = StreamController<List<T>>();
c.onListen = () => c.addStream(substream()).whenComplete(c.close);
c.onCancel = () async {
while (size > 0) {
assert(_offset <= _buffer.length);
if (_buffer.length == _offset) {
if (!await _input.moveNext()) {
size = 0; // no more data
break;
}
_buffer = _input.current;
_offset = 0;
}
final remainingBuffer = _buffer.length - _offset;
if (remainingBuffer >= size) {
_offset += size;
size = 0;
break;
}
size -= remainingBuffer;
_buffer = _emptyList;
_offset = 0;
}
_reading = false;
};
return c.stream;
}