resetOffsetsToEarliest method

Future resetOffsetsToEarliest(
  1. Map<String, Set<int>> topicPartitions
)

Implementation

Future resetOffsetsToEarliest(Map<String, Set<int>> topicPartitions) async {
  var offsetMaster = OffsetMaster(session);
  var earliestOffsets = await offsetMaster.fetchEarliest(topicPartitions);
  List<ConsumerOffset> offsets = [];
  for (var earliest in earliestOffsets) {
    // When consuming we always pass `currentOffset + 1` to fetch next
    // message so here we need to substract 1 from earliest offset, otherwise
    // we'll end up in an infinite loop of "InvalidOffset" errors.
    var actualOffset = earliest.offset - 1;
    offsets.add(ConsumerOffset(earliest.topicName, earliest.partitionId, actualOffset, 'resetToEarliest'));
  }

  return commitOffsets(offsets, -1, '');
}