pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java [149:166]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
        if (keySchema == null) {
            Deserializer<K> kafkaKeyDeserializer = consumerConfig.getConfiguredInstance(
                    ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
            kafkaKeyDeserializer.configure(consumerConfig.originals(), true);
            this.keySchema = new PulsarKafkaSchema<>(kafkaKeyDeserializer);
        } else {
            this.keySchema = keySchema;
            consumerConfig.ignore(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
        }

        if (valueSchema == null) {
            Deserializer<V> kafkaValueDeserializer = consumerConfig.getConfiguredInstance(
                    ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
            kafkaValueDeserializer.configure(consumerConfig.originals(), true);
            this.valueSchema = new PulsarKafkaSchema<>(kafkaValueDeserializer);
        } else {
            this.valueSchema = valueSchema;
            consumerConfig.ignore(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java [140:157]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
        if (keySchema == null) {
            Deserializer<K> kafkaKeyDeserializer = consumerConfig.getConfiguredInstance(
                    ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
            kafkaKeyDeserializer.configure(consumerConfig.originals(), true);
            this.keySchema = new PulsarKafkaSchema<>(kafkaKeyDeserializer);
        } else {
            this.keySchema = keySchema;
            consumerConfig.ignore(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
        }

        if (valueSchema == null) {
            Deserializer<V> kafkaValueDeserializer = consumerConfig.getConfiguredInstance(
                    ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
            kafkaValueDeserializer.configure(consumerConfig.originals(), true);
            this.valueSchema = new PulsarKafkaSchema<>(kafkaValueDeserializer);
        } else {
            this.valueSchema = valueSchema;
            consumerConfig.ignore(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



