findGroupTopicPartitionOffsets method
find topic partition offsets of group's committed
result offset back fill topic
fields FkafkaPartition.high
Implementation
List<FkafkaPartition> findGroupTopicPartitionOffsets(String group, FkafkaTopic topic) {
assert(topic.partitions != null && topic.partitions!.isNotEmpty);
assert(group.isNotEmpty);
// create consumer client join group
final _temp_cc = FkafkaConsumerClient(
conf: FkafkaConf(
{
GROUP_ID: group
}..addAll(_ccBaseConf)
)
);
var rkparlist = _bridges.rd_kafka_topic_partition_list_new(
topic.partitionCount!
);
List<FkafkaPartition> partitions = topic.partitions!;
for (var partition in partitions) {
_bridges.rd_kafka_topic_partition_list_add(
rkparlist,
topic.name.toNativeUtf8(),
partition.id
);
}
// find
_bridges.rd_kafka_committed(
_temp_cc._kafkaPtr,
rkparlist,
defaultTimeoutMs
);
// result back fill
for (var elem in rkparlist.ref.elemList) {
var rowPartition = partitions.firstWhere((_) => elem.partition == _.id);
rowPartition.high = elem.offset;
}
// release
_bridges.rd_kafka_topic_partition_list_destroy(rkparlist);
_temp_cc.release();
return partitions;
}