in pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/KafkaProducerInterceptorWrapper.java [183:206]
private ProducerRecord<K, V> toKafkaRecord(Message<byte[]> message) {
V value;
if (valueSchema instanceof PulsarKafkaSchema) {
PulsarKafkaSchema<V> pulsarKeyKafkaSchema = (PulsarKafkaSchema<V>) valueSchema;
Deserializer valueDeserializer = getDeserializer((pulsarKeyKafkaSchema.getKafkaSerializer()));
value = (V) valueDeserializer.deserialize(topic, message.getValue());
} else {
value = valueSchema.decode(message.getValue());
}
try {
scheme = (Schema<byte[]>) FieldUtils.readField(message, "schema", true);
MessageMetadata messageMetadataBuilder = ((MessageImpl<byte[]>)message).getMessageBuilder();
partitionID = getPartitionID(messageMetadataBuilder);
eventTime = message.getEventTime();
return new ProducerRecord<>(topic, Integer.parseInt(partitionID), eventTime, deserializeKey(topic, message.getKey()), value);
} catch (NumberFormatException e) {
// If not able to parse partitionID, ignore it.
return new ProducerRecord<>(topic, deserializeKey(topic, message.getKey()), value);
} catch (IllegalAccessException e) {
String errorMessage = "Unable to get the schema of message due to " + e.getMessage();
log.error(errorMessage);
throw new RuntimeException(errorMessage);
}
}