pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java [308:318]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    private String getKey(String topic, K key) {
        // If key is a String, we can use it as it is, otherwise, serialize to byte[] and encode in base64
        if (key instanceof String) {
            return (String) key;
        }
        if (keySchema instanceof PulsarKafkaSchema) {
            ((PulsarKafkaSchema) keySchema).setTopic(topic);
        }
        byte[] keyBytes = keySchema.encode(key);
        return Base64.getEncoder().encodeToString(keyBytes);
    }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java [393:403]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    private String getKey(String topic, K key) {
        // If key is a String, we can use it as it is, otherwise, serialize to byte[] and encode in base64
        if (key instanceof String) {
            return (String) key;
        }
        if (keySchema instanceof PulsarKafkaSchema) {
            ((PulsarKafkaSchema) keySchema).setTopic(topic);
        }
        byte[] keyBytes = keySchema.encode(key);
        return Base64.getEncoder().encodeToString(keyBytes);
    }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/KafkaProducerInterceptorWrapper.java [208:218]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    private String serializeKey(String topic, K key) {
        // If key is a String, we can use it as it is, otherwise, serialize to byte[] and encode in base64
        if (key instanceof String) {
            return (String) key;
        }
        if (keySchema instanceof PulsarKafkaSchema) {
            ((PulsarKafkaSchema<K>) keySchema).setTopic(topic);
        }
        byte[] keyBytes = keySchema.encode(key);
        return Base64.getEncoder().encodeToString(keyBytes);
    }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



