MessageSet.fromBytes constructor

MessageSet.fromBytes(
  1. KafkaBytesReader reader
)

Creates new MessageSet from provided data.

Implementation

factory MessageSet.fromBytes(KafkaBytesReader reader) {
  int messageSize = -1;
  var messages = Map<int, Message>();
  while (reader.isNotEOF) {
    try {
      int offset = reader.readInt64();
      messageSize = reader.readInt32();
      var crc = reader.readInt32();

      var data = reader.readRaw(messageSize - 4);
      var actualCrc = Crc32.signed(data);
      if (actualCrc != crc) {
        kafkaLogger.warning('Message CRC sum mismatch. Expected crc: ${crc}, actual: ${actualCrc}');
        throw new MessageCrcMismatchError('Expected crc: ${crc}, actual: ${actualCrc}');
      }
      var messageReader = new KafkaBytesReader.fromBytes(data);
      var message = _readMessage(messageReader);
      if (message.attributes?.compression == KafkaCompression.none) {
        messages[offset] = message;
      } else {
        if (message.attributes?.compression == KafkaCompression.snappy) throw new ArgumentError('Snappy compression is not supported yet by the client.');

        var codec = new GZipCodec();
        var innerReader = new KafkaBytesReader.fromBytes(codec.decode(message.value));
        var innerMessageSet = new MessageSet.fromBytes(innerReader);
        for (var innerOffset in innerMessageSet.messages.keys) {
          messages[innerOffset] = innerMessageSet.messages[innerOffset]!;
        }
      }
    } on RangeError {
      // According to spec server is allowed to return partial
      // messages, so we just ignore it here and exit the loop.
      var remaining = reader.length - reader.offset;
      kafkaLogger.info('Encountered partial message. Expected message size: ${messageSize}, bytes left in buffer: ${remaining}, total buffer size ${reader.length}');
      break;
    }
  }

  return new MessageSet._(messages);
}