next property

  1. @override
Future<List<int>?> next
override

Returns the next full message that is received, or null if none are left.

Implementation

@override
Future<List<int>?> get next async {
  try {
    // Loop while there is data in the input buffer or the input stream.
    while (
        _inputBufferPos != _inputBuffer.length || await _inputQueue.hasNext) {
      // If the input buffer is empty fill it from the input stream.
      if (_inputBufferPos == _inputBuffer.length) {
        _inputBuffer = await _inputQueue.next;
        _inputBufferPos = 0;
      }

      // Loop over the input buffer. Might return without reading the full
      // buffer if a message completes. Then, this is tracked in
      // `_inputBufferPos`.
      while (_inputBufferPos != _inputBuffer.length) {
        if (_readingLength) {
          // Reading message length byte by byte.
          var byte = _inputBuffer[_inputBufferPos++];
          _lengthBuffer.add(byte);
          // Check for the last byte in the length, and then read it.
          if ((byte & 0x80) == 0) {
            var reader = CodedBufferReader(_lengthBuffer);
            var length = reader.readInt32();
            _lengthBuffer = [];

            // Special case: don't keep reading an empty message, return it
            // and `_readingLength` stays true.
            if (length == 0) {
              return Uint8List(0);
            }

            // Switch to reading raw data. Allocate message buffer and reset
            // `_messagePos`.
            _readingLength = false;
            _message = Uint8List(length);
            _messagePos = 0;
          }
        } else {
          // Copy as much as possible from the input buffer. Limit is the
          // smaller of the remaining length to fill in the message and the
          // remaining length in the buffer.
          var lengthToCopy = min(_message.length - _messagePos,
              _inputBuffer.length - _inputBufferPos);
          _message.setRange(
              _messagePos,
              _messagePos + lengthToCopy,
              _inputBuffer.sublist(
                  _inputBufferPos, _inputBufferPos + lengthToCopy));
          _messagePos += lengthToCopy;
          _inputBufferPos += lengthToCopy;

          // If there is a complete message to return, return it and switch
          // back to reading length.
          if (_messagePos == _message.length) {
            var result = _message;
            // Don't keep a reference to the message.
            _message = Uint8List(0);
            _readingLength = true;
            return result;
          }
        }
      }
    }

    // If there is nothing left in the queue then cancel the subscription.
    unawaited(cancel());
  } catch (e) {
    // It appears we sometimes get an exception instead of -1 as expected when
    // stdin closes, this handles that in the same way (returning a null
    // message)
    return null;
  }
  return null;
}