in flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java [500:515]
private Map<TopicPartition, OffsetAndMetadata> getZookeeperOffsets(
KafkaZkClient zkClient, KafkaConsumer<String, byte[]> consumer, String topicStr) {
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
List<PartitionInfo> partitions = consumer.partitionsFor(topicStr);
for (PartitionInfo partition : partitions) {
TopicPartition topicPartition = new TopicPartition(topicStr, partition.partition());
Option<Object> optionOffset = zkClient.getConsumerOffset(groupId, topicPartition);
if (optionOffset.nonEmpty()) {
Long offset = (Long) optionOffset.get();
OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(offset);
offsets.put(topicPartition, offsetAndMetadata);
}
}
return offsets;
}