readStream method Null safety

Stream<List<T>> readStream(
  1. int size
)

Read next size elements from chunked stream as a sub-stream.

This will pass-through chunks from the underlying chunked stream until size elements have been returned, or end-of-stream has been encountered.

If end-of-stream is encountered before size elements is read, this returns a list with fewer than size elements (indicating end-of-stream).

If the underlying stream throws, the stream is cancelled, the exception is propogated and further read operations will fail.

If the sub-stream returned from readStream is cancelled the remaining unread elements up-to size are drained, allowing subsequent read-operations to proceed after cancellation.

Throws, if another read-operation is on-going.

Implementation

Stream<List<T>> readStream(int size) {
  RangeError.checkNotNegative(size, 'size');
  if (_reading) {
    throw StateError('Concurrent read operations are not allowed!');
  }
  _reading = true;

  final substream = () async* {
    // While we have data to read
    while (size > 0) {
      // Read something into the buffer, if buffer has been consumed.
      assert(_offset <= _buffer.length);
      if (_offset == _buffer.length) {
        if (!(await _input.moveNext())) {
          // Don't attempt to read more data, as there is no more data.
          size = 0;
          _reading = false;
          break;
        }
        _buffer = _input.current;
        _offset = 0;
      }

      final remainingBuffer = _buffer.length - _offset;
      if (remainingBuffer > 0) {
        if (remainingBuffer >= size) {
          List<T> output;
          if (_buffer is Uint8List) {
            output = Uint8List.sublistView(
                _buffer as Uint8List, _offset, _offset + size) as List<T>;
          } else {
            output = _buffer.sublist(_offset, _offset + size);
          }
          _offset += size;
          size = 0;
          yield output;
          _reading = false;
          break;
        }

        final output = _offset == 0 ? _buffer : _buffer.sublist(_offset);
        size -= remainingBuffer;
        _buffer = _emptyList;
        _offset = 0;
        yield output;
      }
    }
  };

  final c = StreamController<List<T>>();
  c.onListen = () => c.addStream(substream()).whenComplete(c.close);
  c.onCancel = () async {
    while (size > 0) {
      assert(_offset <= _buffer.length);
      if (_buffer.length == _offset) {
        if (!await _input.moveNext()) {
          size = 0; // no more data
          break;
        }
        _buffer = _input.current;
        _offset = 0;
      }

      final remainingBuffer = _buffer.length - _offset;
      if (remainingBuffer >= size) {
        _offset += size;
        size = 0;
        break;
      }

      size -= remainingBuffer;
      _buffer = _emptyList;
      _offset = 0;
    }
    _reading = false;
  };

  return c.stream;
}