getMetadata method

Future<ClusterMetadata> getMetadata(
  1. Set<String> topicNames, {
  2. bool invalidateCache = false,
})

Fetches Kafka cluster metadata. If topicNames is null then metadata for all topics will be returned.

Please note that requests to fetch all topics can not be cached by the client, so it may not be as performant as requesting topics explicitely.

Also, if Kafka server is configured to auto-create topics you must explicitely specify topic name in metadata request, otherwise topic will not be created.

Implementation

Future<ClusterMetadata> getMetadata(Set<String> topicNames, {bool invalidateCache = false}) async {
  if (topicNames.isEmpty) throw new ArgumentError.value(topicNames, 'topicNames', 'List of topic names can not be empty');

  if (invalidateCache) {
    _brokers = null;
    _topicsMetadata = new Map();
  }
  // TODO: actually rotate default hosts on failure.
  var contactPoint = _getCurrentContactPoint();

  var topicsToFetch = topicNames.where((t) => !_topicsMetadata.keys.contains(t));
  if (topicsToFetch.length > 0) {
    Future<MetadataResponse> responseFuture = _sendMetadataRequest(topicsToFetch.toSet(), contactPoint.host, contactPoint.port);
    for (var name in topicsToFetch) {
      _topicsMetadata[name] = responseFuture.then((response) {
        return response.topics.firstWhere((_) => _.topicName == name);
      });
    }

    _brokers = responseFuture.then((response) => response.brokers);
  }
  List<TopicMetadata> allMetadata = await Future.wait(_topicsMetadata.values);
  var metadata = allMetadata.where((_) => topicNames.contains(_.topicName));
  var brokers = await _brokers;

  return ClusterMetadata(brokers!, List.unmodifiable(metadata));
}