in flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java [331:357]
public ProducerRecord<byte[], byte[]> serialize(
IN element, KafkaSinkContext context, Long timestamp) {
final String targetTopic = topicSelector.apply(element);
final byte[] value = valueSerializationSchema.serialize(element);
byte[] key = null;
if (keySerializationSchema != null) {
key = keySerializationSchema.serialize(element);
}
final OptionalInt partition =
partitioner != null
? OptionalInt.of(
partitioner.partition(
element,
key,
value,
targetTopic,
context.getPartitionsForTopic(targetTopic)))
: OptionalInt.empty();
return new ProducerRecord<>(
targetTopic,
partition.isPresent() ? partition.getAsInt() : null,
timestamp == null || timestamp < 0L ? null : timestamp,
key,
value,
headerProvider != null ? headerProvider.getHeaders(element) : null);
}