public ListOffsetsResult listOffsets()

in pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/admin/PulsarKafkaAdminClient.java [148:172]


    public ListOffsetsResult listOffsets(Map<TopicPartition, OffsetSpec> topicPartitionOffsets,
                                         ListOffsetsOptions listOffsetsOptions) {
        final Map<TopicPartition, KafkaFutureImpl<ListOffsetsResult.ListOffsetsResultInfo>> futures =
            execute(topicPartitionOffsets, (entry, future) -> {
                TopicPartition topicPartition = entry.getKey();
                String topicName = isPartitionedTopic(topicPartition.topic())
                        ? topicPartition.topic() + TopicName.PARTITIONED_TOPIC_SUFFIX + topicPartition.partition()
                        : topicPartition.topic();
                admin.topics()
                        .getLastMessageIdAsync(topicName)
                        .whenComplete((msgId, ex) -> {
                            if (ex == null) {
                                long offset = getSequenceId(msgId);
                                future.complete(new ListOffsetsResult.ListOffsetsResultInfo(
                                        offset,
                                        System.currentTimeMillis(),
                                        Optional.empty()));
                            } else {
                                log.error("Admin failed to get lastMessageId for topic " + topicName, ex);
                                future.completeExceptionally(ex);
                            }
                        });
            });
        return new ListOffsetsResult(new HashMap<>(futures));
    }