toBytes method

  1. @override
List<int> toBytes()
override

Implementation

@override
List<int> toBytes() {
  var builder = new KafkaBytesBuilder.withRequestHeader(apiKey, apiVersion, correlationId);
  builder.addInt16(requiredAcks);
  builder.addInt32(timeout);

  Map<String, Map<int, MessageSet>> messageSets = new Map();
  for (var envelope in messages) {
    if (!messageSets.containsKey(envelope.topicName)) {
      messageSets[envelope.topicName] = new Map<int, MessageSet>();
    }
    messageSets[envelope.topicName]?[envelope.partitionId] = new MessageSet.build(envelope);
  }

  builder.addInt32(messageSets.length);
  messageSets.forEach((topicName, partitions) {
    builder.addString(topicName);
    builder.addInt32(partitions.length);
    partitions.forEach((partitionId, messageSet) {
      builder.addInt32(partitionId);
      var messageData = messageSet.toBytes();
      builder.addInt32(messageData.length);
      builder.addRaw(messageData);
    });
  });

  var body = builder.takeBytes();
  builder.addBytes(body);

  return builder.takeBytes();
}