getConsumerMetadata method

Future<GroupCoordinatorResponse> getConsumerMetadata(
  1. String consumerGroup
)

Fetches metadata for specified consumerGroup.

It handles ConsumerCoordinatorNotAvailableCode(15) API error which Kafka returns in case GroupCoordinatorRequest is sent for the very first time to this particular broker (when special topic to store consumer offsets does not exist yet).

It will attempt up to 5 retries (with linear delay) in order to fetch metadata.

Implementation

Future<GroupCoordinatorResponse> getConsumerMetadata(String consumerGroup) async {
  // TODO: rotate default hosts.
  var contactPoint = _getCurrentContactPoint();
  var request = new GroupCoordinatorRequest(consumerGroup);

  GroupCoordinatorResponse response = await _send(contactPoint.host, contactPoint.port, request);
  var retries = 1;
  var error = new KafkaServerError(response.errorCode);
  while (error.isConsumerCoordinatorNotAvailable && retries < 5) {
    var future = new Future.delayed(new Duration(seconds: retries), () => _send(contactPoint.host, contactPoint.port, request));

    response = await future;
    error = new KafkaServerError(response.errorCode);
    retries++;
  }

  if (error.isError) throw error;

  return response;
}