in flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java [858:927]
public void invoke(
FlinkKafkaProducer.KafkaTransactionState transaction, IN next, Context context)
throws FlinkKafkaException {
checkErroneous();
ProducerRecord<byte[], byte[]> record;
if (keyedSchema != null) {
byte[] serializedKey = keyedSchema.serializeKey(next);
byte[] serializedValue = keyedSchema.serializeValue(next);
String targetTopic = keyedSchema.getTargetTopic(next);
if (targetTopic == null) {
targetTopic = defaultTopicId;
}
Long timestamp = null;
if (this.writeTimestampToKafka) {
timestamp = context.timestamp();
}
int[] partitions = topicPartitionsMap.get(targetTopic);
if (null == partitions) {
partitions = getPartitionsByTopic(targetTopic, transaction.producer);
topicPartitionsMap.put(targetTopic, partitions);
}
if (flinkKafkaPartitioner != null) {
record =
new ProducerRecord<>(
targetTopic,
flinkKafkaPartitioner.partition(
next,
serializedKey,
serializedValue,
targetTopic,
partitions),
timestamp,
serializedKey,
serializedValue);
} else {
record =
new ProducerRecord<>(
targetTopic, null, timestamp, serializedKey, serializedValue);
}
} else if (kafkaSchema != null) {
if (kafkaSchema instanceof KafkaContextAware) {
@SuppressWarnings("unchecked")
KafkaContextAware<IN> contextAwareSchema = (KafkaContextAware<IN>) kafkaSchema;
String targetTopic = contextAwareSchema.getTargetTopic(next);
if (targetTopic == null) {
targetTopic = defaultTopicId;
}
int[] partitions = topicPartitionsMap.get(targetTopic);
if (null == partitions) {
partitions = getPartitionsByTopic(targetTopic, transaction.producer);
topicPartitionsMap.put(targetTopic, partitions);
}
contextAwareSchema.setPartitions(partitions);
}
record = kafkaSchema.serialize(next, context.timestamp());
} else {
throw new RuntimeException(
"We have neither KafkaSerializationSchema nor KeyedSerializationSchema, this"
+ "is a bug.");
}
pendingRecords.incrementAndGet();
transaction.producer.send(record, callback);
}