in pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java [578:595]
private K getKey(String topic, Message<byte[]> msg) {
if (!msg.hasKey()) {
return null;
}
if (keySchema instanceof PulsarKafkaSchema) {
PulsarKafkaSchema<K> pulsarKafkaSchema = (PulsarKafkaSchema) keySchema;
Deserializer<K> kafkaDeserializer = pulsarKafkaSchema.getKafkaDeserializer();
if (kafkaDeserializer instanceof StringDeserializer) {
return (K) msg.getKey();
}
pulsarKafkaSchema.setTopic(topic);
}
// Assume base64 encoding
byte[] data = Base64.getDecoder().decode(msg.getKey());
return keySchema.decode(data);
}