in pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java [281:306]
private TypedMessageBuilder<byte[]> buildMessage(org.apache.pulsar.client.api.Producer<byte[]> producer, ProducerRecord<K, V> record) {
TypedMessageBuilder<byte[]> builder = producer.newMessage();
byte[] keyBytes = null;
if (record.key() != null) {
String key = getKey(record.topic(), record.key());
keyBytes = key.getBytes(StandardCharsets.UTF_8);
builder.key(key);
}
if (valueSchema instanceof PulsarKafkaSchema) {
((PulsarKafkaSchema<V>) valueSchema).setTopic(record.topic());
}
byte[] value = valueSchema.encode(record.value());
builder.value(value);
if (record.partition() != null) {
// Partition was explicitly set on the record
builder.property(KafkaMessageRouter.PARTITION_ID, record.partition().toString());
} else {
// Get the partition id from the partitioner
int partition = partitioner.partition(record.topic(), record.key(), keyBytes, record.value(), value, cluster);
builder.property(KafkaMessageRouter.PARTITION_ID, Integer.toString(partition));
}
return builder;
}