private Map getZookeeperOffsets()

in flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java [500:515]


  private Map<TopicPartition, OffsetAndMetadata> getZookeeperOffsets(
          KafkaZkClient zkClient, KafkaConsumer<String, byte[]> consumer, String topicStr) {

    Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
    List<PartitionInfo> partitions = consumer.partitionsFor(topicStr);
    for (PartitionInfo partition : partitions) {
      TopicPartition topicPartition = new TopicPartition(topicStr, partition.partition());
      Option<Object> optionOffset = zkClient.getConsumerOffset(groupId, topicPartition);
      if (optionOffset.nonEmpty()) {
        Long offset = (Long) optionOffset.get();
        OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(offset);
        offsets.put(topicPartition, offsetAndMetadata);
      }
    }
    return offsets;
  }