public static void printKafkaInfo()

in crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Util.java [46:87]


    public static void printKafkaInfo(String host, String groupId) {
        // Initialize the Kafka Admin Client
        System.out.println("Kafka Info: " + host);
        Properties adminProps = new Properties();
        adminProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, host);
        try (AdminClient adminClient = AdminClient.create(adminProps)) {
            // Get list of topics
            Set<String> topicNames = adminClient.listTopics(new ListTopicsOptions().listInternal(false)).names().get();
            System.out.println("Live Topics: " + topicNames);

            // Initialize the Kafka Consumer Client to fetch offsets
            Properties consumerProps = new Properties();
            consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, host);
            consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
            consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MirroredSolrRequestSerializer.class.getName());

            try (Consumer<String, String> consumer = new KafkaConsumer<>(consumerProps)) {
                for (String topic : topicNames) {
                    Set<TopicPartition> topicPartitions = consumer.assignment();
                    Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
                    System.out.println("Topic Partitions: " + topicPartitions.size());
                    for (TopicPartition topicPartition : topicPartitions) {
                        if (topicPartition.topic().equals(topic)) {
                            long endOffset = consumer.position(topicPartition);
                            long committedOffset = consumer.committed(topicPartition).offset();
                            long updatesInQueue = endOffset - committedOffset;

                            offsets.put(topicPartition, new OffsetAndMetadata(endOffset));
                            System.out.println("Topic: " + topic);
                            System.out.println("  Partition: " + topicPartition.partition());
                            System.out.println("  End Offset: " + endOffset);
                            System.out.println("  Committed Offset: " + committedOffset);
                            System.out.println("  Updates in Queue: " + updatesInQueue);
                        }
                    }
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }