private ProducerRecord toKafkaRecord()

in pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/KafkaProducerInterceptorWrapper.java [183:206]


    private ProducerRecord<K, V> toKafkaRecord(Message<byte[]> message) {
        V value;
        if (valueSchema instanceof PulsarKafkaSchema) {
            PulsarKafkaSchema<V> pulsarKeyKafkaSchema = (PulsarKafkaSchema<V>) valueSchema;
            Deserializer valueDeserializer = getDeserializer((pulsarKeyKafkaSchema.getKafkaSerializer()));
            value = (V) valueDeserializer.deserialize(topic, message.getValue());
        } else {
            value = valueSchema.decode(message.getValue());
        }
        try {
            scheme = (Schema<byte[]>) FieldUtils.readField(message, "schema", true);
            MessageMetadata messageMetadataBuilder = ((MessageImpl<byte[]>)message).getMessageBuilder();
            partitionID = getPartitionID(messageMetadataBuilder);
            eventTime = message.getEventTime();
            return new ProducerRecord<>(topic, Integer.parseInt(partitionID), eventTime, deserializeKey(topic, message.getKey()), value);
        } catch (NumberFormatException e) {
            // If not able to parse partitionID, ignore it.
            return new ProducerRecord<>(topic, deserializeKey(topic, message.getKey()), value);
        } catch (IllegalAccessException e) {
            String errorMessage = "Unable to get the schema of message due to " + e.getMessage();
            log.error(errorMessage);
            throw new RuntimeException(errorMessage);
        }
    }