resetOffsetsToEarliest method
Implementation
Future resetOffsetsToEarliest(Map<String, Set<int>> topicPartitions) async {
var offsetMaster = OffsetMaster(session);
var earliestOffsets = await offsetMaster.fetchEarliest(topicPartitions);
List<ConsumerOffset> offsets = [];
for (var earliest in earliestOffsets) {
// When consuming we always pass `currentOffset + 1` to fetch next
// message so here we need to substract 1 from earliest offset, otherwise
// we'll end up in an infinite loop of "InvalidOffset" errors.
var actualOffset = earliest.offset - 1;
offsets.add(ConsumerOffset(earliest.topicName, earliest.partitionId, actualOffset, 'resetToEarliest'));
}
return commitOffsets(offsets, -1, '');
}