next property
Returns the next full message that is received, or null if none are left.
Implementation
@override
Future<List<int>?> get next async {
try {
// Loop while there is data in the input buffer or the input stream.
while (
_inputBufferPos != _inputBuffer.length || await _inputQueue.hasNext) {
// If the input buffer is empty fill it from the input stream.
if (_inputBufferPos == _inputBuffer.length) {
_inputBuffer = await _inputQueue.next;
_inputBufferPos = 0;
}
// Loop over the input buffer. Might return without reading the full
// buffer if a message completes. Then, this is tracked in
// `_inputBufferPos`.
while (_inputBufferPos != _inputBuffer.length) {
if (_readingLength) {
// Reading message length byte by byte.
var byte = _inputBuffer[_inputBufferPos++];
_lengthBuffer.add(byte);
// Check for the last byte in the length, and then read it.
if ((byte & 0x80) == 0) {
var reader = CodedBufferReader(_lengthBuffer);
var length = reader.readInt32();
_lengthBuffer = [];
// Special case: don't keep reading an empty message, return it
// and `_readingLength` stays true.
if (length == 0) {
return Uint8List(0);
}
// Switch to reading raw data. Allocate message buffer and reset
// `_messagePos`.
_readingLength = false;
_message = Uint8List(length);
_messagePos = 0;
}
} else {
// Copy as much as possible from the input buffer. Limit is the
// smaller of the remaining length to fill in the message and the
// remaining length in the buffer.
var lengthToCopy = min(_message.length - _messagePos,
_inputBuffer.length - _inputBufferPos);
_message.setRange(
_messagePos,
_messagePos + lengthToCopy,
_inputBuffer.sublist(
_inputBufferPos, _inputBufferPos + lengthToCopy));
_messagePos += lengthToCopy;
_inputBufferPos += lengthToCopy;
// If there is a complete message to return, return it and switch
// back to reading length.
if (_messagePos == _message.length) {
var result = _message;
// Don't keep a reference to the message.
_message = Uint8List(0);
_readingLength = true;
return result;
}
}
}
}
// If there is nothing left in the queue then cancel the subscription.
unawaited(cancel());
} catch (e) {
// It appears we sometimes get an exception instead of -1 as expected when
// stdin closes, this handles that in the same way (returning a null
// message)
return null;
}
return null;
}