pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java [375:387]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
        if (valueSchema instanceof PulsarKafkaSchema) {
            ((PulsarKafkaSchema<V>) valueSchema).setTopic(record.topic());
        }
        byte[] value = valueSchema.encode(record.value());
        builder.value(value);

        if (record.partition() != null) {
            // Partition was explicitly set on the record
            builder.property(KafkaMessageRouter.PARTITION_ID, record.partition().toString());
        } else {
            // Get the partition id from the partitioner
            int partition = partitioner.partition(record.topic(), record.key(), keyBytes, record.value(), value, cluster);
            builder.property(KafkaMessageRouter.PARTITION_ID, Integer.toString(partition));
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java [291:303]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
        if (valueSchema instanceof PulsarKafkaSchema) {
            ((PulsarKafkaSchema<V>) valueSchema).setTopic(record.topic());
        }
        byte[] value = valueSchema.encode(record.value());
        builder.value(value);

        if (record.partition() != null) {
            // Partition was explicitly set on the record
            builder.property(KafkaMessageRouter.PARTITION_ID, record.partition().toString());
        } else {
            // Get the partition id from the partitioner
            int partition = partitioner.partition(record.topic(), record.key(), keyBytes, record.value(), value, cluster);
            builder.property(KafkaMessageRouter.PARTITION_ID, Integer.toString(partition));
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



