in flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java [299:334]
public void invoke(IN next, Context context) throws Exception {
// propagate asynchronous errors
checkErroneous();
byte[] serializedKey = schema.serializeKey(next);
byte[] serializedValue = schema.serializeValue(next);
String targetTopic = schema.getTargetTopic(next);
if (targetTopic == null) {
targetTopic = defaultTopicId;
}
int[] partitions = this.topicPartitionsMap.get(targetTopic);
if (null == partitions) {
partitions = getPartitionsByTopic(targetTopic, producer);
this.topicPartitionsMap.put(targetTopic, partitions);
}
ProducerRecord<byte[], byte[]> record;
if (flinkKafkaPartitioner == null) {
record = new ProducerRecord<>(targetTopic, serializedKey, serializedValue);
} else {
record =
new ProducerRecord<>(
targetTopic,
flinkKafkaPartitioner.partition(
next, serializedKey, serializedValue, targetTopic, partitions),
serializedKey,
serializedValue);
}
if (flushOnCheckpoint) {
synchronized (pendingRecordsLock) {
pendingRecords++;
}
}
producer.send(record, callback);
}