in pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java [626:649]
public void seekToEnd(Collection<TopicPartition> partitions) {
List<CompletableFuture<Void>> futures = new ArrayList<>();
if (partitions.isEmpty()) {
partitions = consumers.keySet();
}
lastCommittedOffset.clear();
lastReceivedOffset.clear();
for (TopicPartition tp : partitions) {
TopicPartition normalizedTp = normalizedTopicPartition(tp);
org.apache.pulsar.client.api.Consumer<byte[]> c = consumers.get(normalizedTp);
if (c == null) {
futures.add(FutureUtil.failedFuture(
new IllegalArgumentException("Cannot seek on a partition where we are not subscribed")));
} else {
futures.add(c.seekAsync(MessageId.latest));
unpolledPartitions.add(tp);
}
}
FutureUtil.waitForAll(futures).join();
}