public void seekToBeginning()

in pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java [601:623]


    public void seekToBeginning(Collection<TopicPartition> partitions) {
        List<CompletableFuture<Void>> futures = new ArrayList<>();

        if (partitions.isEmpty()) {
            partitions = consumers.keySet();
        }
        lastCommittedOffset.clear();
        lastReceivedOffset.clear();
        
        for (TopicPartition tp : partitions) {
            TopicPartition normalizedTp = normalizedTopicPartition(tp);
            org.apache.pulsar.client.api.Consumer<byte[]> c = consumers.get(normalizedTp);
            if (c == null) {
                futures.add(FutureUtil.failedFuture(
                        new IllegalArgumentException("Cannot seek on a partition where we are not subscribed")));
            } else {
                futures.add(c.seekAsync(MessageId.earliest));
                unpolledPartitions.add(tp);
            }
        }

        FutureUtil.waitForAll(futures).join();
    }