in crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Util.java [46:87]
public static void printKafkaInfo(String host, String groupId) {
// Initialize the Kafka Admin Client
System.out.println("Kafka Info: " + host);
Properties adminProps = new Properties();
adminProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, host);
try (AdminClient adminClient = AdminClient.create(adminProps)) {
// Get list of topics
Set<String> topicNames = adminClient.listTopics(new ListTopicsOptions().listInternal(false)).names().get();
System.out.println("Live Topics: " + topicNames);
// Initialize the Kafka Consumer Client to fetch offsets
Properties consumerProps = new Properties();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, host);
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MirroredSolrRequestSerializer.class.getName());
try (Consumer<String, String> consumer = new KafkaConsumer<>(consumerProps)) {
for (String topic : topicNames) {
Set<TopicPartition> topicPartitions = consumer.assignment();
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
System.out.println("Topic Partitions: " + topicPartitions.size());
for (TopicPartition topicPartition : topicPartitions) {
if (topicPartition.topic().equals(topic)) {
long endOffset = consumer.position(topicPartition);
long committedOffset = consumer.committed(topicPartition).offset();
long updatesInQueue = endOffset - committedOffset;
offsets.put(topicPartition, new OffsetAndMetadata(endOffset));
System.out.println("Topic: " + topic);
System.out.println(" Partition: " + topicPartition.partition());
System.out.println(" End Offset: " + endOffset);
System.out.println(" Committed Offset: " + committedOffset);
System.out.println(" Updates in Queue: " + updatesInQueue);
}
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}