pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java [385:397]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
                long offset = MessageIdUtils.getOffset(msgId);

                TopicPartition tp = new TopicPartition(topic, partition);
                if (lastReceivedOffset.get(tp) == null && !unpolledPartitions.contains(tp)) {
                	log.info("When polling offsets, invalid offsets were detected. Resetting topic partition {}", tp);
                	resetOffsets(tp);
                }

                K key = getKey(topic, msg);
                if (valueSchema instanceof PulsarKafkaSchema) {
                    ((PulsarKafkaSchema<V>) valueSchema).setTopic(topic);
                }
                V value = valueSchema.decode(msg.getData());
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java [317:329]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
                long offset = MessageIdUtils.getOffset(msgId);

                TopicPartition tp = new TopicPartition(topic, partition);
                if (lastReceivedOffset.get(tp) == null && !unpolledPartitions.contains(tp)) {
                    log.info("When polling offsets, invalid offsets were detected. Resetting topic partition {}", tp);
                    resetOffsets(tp);
                }

                K key = getKey(topic, msg);
                if (valueSchema instanceof PulsarKafkaSchema) {
                    ((PulsarKafkaSchema<V>) valueSchema).setTopic(topic);
                }
                V value = valueSchema.decode(msg.getData());
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



