in pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/simple/consumer/PulsarKafkaSimpleConsumer.java [176:188]
public PulsarOffsetResponse getOffsetsBefore(PulsarOffsetRequest or) {
Map<TopicAndPartition, PartitionOffsetRequestInfo> request = or.getRequestInfo();
Map<TopicAndPartition, Long> offsetResoponse = Maps.newHashMap();
for (Entry<TopicAndPartition, PartitionOffsetRequestInfo> topicPartitionRequest : request.entrySet()) {
TopicAndPartition topic = topicPartitionRequest.getKey();
long time = topicPartitionRequest.getValue().time();
if (time != kafka.api.OffsetRequest.EarliestTime() && time != kafka.api.OffsetRequest.LatestTime()) {
throw new IllegalArgumentException("Time has to be from EarliestTime or LatestTime");
}
offsetResoponse.put(topic, time);
}
return new PulsarOffsetResponse(offsetResoponse);
}