MessageSet.fromBytes constructor
MessageSet.fromBytes(
- KafkaBytesReader reader
Creates new MessageSet from provided data.
Implementation
factory MessageSet.fromBytes(KafkaBytesReader reader) {
int messageSize = -1;
var messages = Map<int, Message>();
while (reader.isNotEOF) {
try {
int offset = reader.readInt64();
messageSize = reader.readInt32();
var crc = reader.readInt32();
var data = reader.readRaw(messageSize - 4);
var actualCrc = Crc32.signed(data);
if (actualCrc != crc) {
kafkaLogger.warning('Message CRC sum mismatch. Expected crc: ${crc}, actual: ${actualCrc}');
throw new MessageCrcMismatchError('Expected crc: ${crc}, actual: ${actualCrc}');
}
var messageReader = new KafkaBytesReader.fromBytes(data);
var message = _readMessage(messageReader);
if (message.attributes?.compression == KafkaCompression.none) {
messages[offset] = message;
} else {
if (message.attributes?.compression == KafkaCompression.snappy) throw new ArgumentError('Snappy compression is not supported yet by the client.');
var codec = new GZipCodec();
var innerReader = new KafkaBytesReader.fromBytes(codec.decode(message.value));
var innerMessageSet = new MessageSet.fromBytes(innerReader);
for (var innerOffset in innerMessageSet.messages.keys) {
messages[innerOffset] = innerMessageSet.messages[innerOffset]!;
}
}
} on RangeError {
// According to spec server is allowed to return partial
// messages, so we just ignore it here and exit the loop.
var remaining = reader.length - reader.offset;
kafkaLogger.info('Encountered partial message. Expected message size: ${messageSize}, bytes left in buffer: ${remaining}, total buffer size ${reader.length}');
break;
}
}
return new MessageSet._(messages);
}