findGroupTopicPartitionOffsets method

List<FkafkaPartition> findGroupTopicPartitionOffsets(
  1. String group,
  2. FkafkaTopic topic
)

find topic partition offsets of group's committed

result offset back fill topic fields FkafkaPartition.high

Implementation

List<FkafkaPartition> findGroupTopicPartitionOffsets(String group, FkafkaTopic topic) {
  assert(topic.partitions != null && topic.partitions!.isNotEmpty);
  assert(group.isNotEmpty);

  // create consumer client join group
  final _temp_cc = FkafkaConsumerClient(
      conf: FkafkaConf(
          {
            GROUP_ID: group
          }..addAll(_ccBaseConf)
      )
  );

  var rkparlist = _bridges.rd_kafka_topic_partition_list_new(
    topic.partitionCount!
  );

  List<FkafkaPartition> partitions = topic.partitions!;

  for (var partition in partitions) {
    _bridges.rd_kafka_topic_partition_list_add(
      rkparlist,
      topic.name.toNativeUtf8(),
      partition.id
    );
  }

  // find
  _bridges.rd_kafka_committed(
      _temp_cc._kafkaPtr,
      rkparlist,
      defaultTimeoutMs
  );

  // result back fill
  for (var elem in rkparlist.ref.elemList) {
    var rowPartition = partitions.firstWhere((_) => elem.partition == _.id);
    rowPartition.high = elem.offset;
  }

  // release
  _bridges.rd_kafka_topic_partition_list_destroy(rkparlist);
  _temp_cc.release();

  return partitions;
}