getConsumerMetadata method
Fetches metadata for specified consumerGroup.
It handles ConsumerCoordinatorNotAvailableCode(15) API error which Kafka
returns in case GroupCoordinatorRequest is sent for the very first time
to this particular broker (when special topic to store consumer offsets
does not exist yet).
It will attempt up to 5 retries (with linear delay) in order to fetch metadata.
Implementation
Future<GroupCoordinatorResponse> getConsumerMetadata(String consumerGroup) async {
// TODO: rotate default hosts.
var contactPoint = _getCurrentContactPoint();
var request = new GroupCoordinatorRequest(consumerGroup);
GroupCoordinatorResponse response = await _send(contactPoint.host, contactPoint.port, request);
var retries = 1;
var error = new KafkaServerError(response.errorCode);
while (error.isConsumerCoordinatorNotAvailable && retries < 5) {
var future = new Future.delayed(new Duration(seconds: retries), () => _send(contactPoint.host, contactPoint.port, request));
response = await future;
error = new KafkaServerError(response.errorCode);
retries++;
}
if (error.isError) throw error;
return response;
}