public PulsarOffsetResponse getOffsetsBefore()

in pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/simple/consumer/PulsarKafkaSimpleConsumer.java [176:188]


    public PulsarOffsetResponse getOffsetsBefore(PulsarOffsetRequest or) {
        Map<TopicAndPartition, PartitionOffsetRequestInfo> request = or.getRequestInfo();
        Map<TopicAndPartition, Long> offsetResoponse = Maps.newHashMap();
        for (Entry<TopicAndPartition, PartitionOffsetRequestInfo> topicPartitionRequest : request.entrySet()) {
            TopicAndPartition topic = topicPartitionRequest.getKey();
            long time = topicPartitionRequest.getValue().time();
            if (time != kafka.api.OffsetRequest.EarliestTime() && time != kafka.api.OffsetRequest.LatestTime()) {
                throw new IllegalArgumentException("Time has to be from EarliestTime or LatestTime");
            }
            offsetResoponse.put(topic, time);
        }
        return new PulsarOffsetResponse(offsetResoponse);
    }