in pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/admin/PulsarKafkaAdminClient.java [148:172]
public ListOffsetsResult listOffsets(Map<TopicPartition, OffsetSpec> topicPartitionOffsets,
ListOffsetsOptions listOffsetsOptions) {
final Map<TopicPartition, KafkaFutureImpl<ListOffsetsResult.ListOffsetsResultInfo>> futures =
execute(topicPartitionOffsets, (entry, future) -> {
TopicPartition topicPartition = entry.getKey();
String topicName = isPartitionedTopic(topicPartition.topic())
? topicPartition.topic() + TopicName.PARTITIONED_TOPIC_SUFFIX + topicPartition.partition()
: topicPartition.topic();
admin.topics()
.getLastMessageIdAsync(topicName)
.whenComplete((msgId, ex) -> {
if (ex == null) {
long offset = getSequenceId(msgId);
future.complete(new ListOffsetsResult.ListOffsetsResultInfo(
offset,
System.currentTimeMillis(),
Optional.empty()));
} else {
log.error("Admin failed to get lastMessageId for topic " + topicName, ex);
future.completeExceptionally(ex);
}
});
});
return new ListOffsetsResult(new HashMap<>(futures));
}