in flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java [136:182]
public FlinkKafkaProducerBase(
String defaultTopicId,
KeyedSerializationSchema<IN> serializationSchema,
Properties producerConfig,
FlinkKafkaPartitioner<IN> customPartitioner) {
requireNonNull(defaultTopicId, "TopicID not set");
requireNonNull(serializationSchema, "serializationSchema not set");
requireNonNull(producerConfig, "producerConfig not set");
ClosureCleaner.clean(
customPartitioner, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
ClosureCleaner.ensureSerializable(serializationSchema);
this.defaultTopicId = defaultTopicId;
this.schema = serializationSchema;
this.producerConfig = producerConfig;
this.flinkKafkaPartitioner = customPartitioner;
// set the producer configuration properties for kafka record key value serializers.
if (!producerConfig.containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) {
this.producerConfig.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class.getName());
} else {
LOG.warn(
"Overwriting the '{}' is not recommended",
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
}
if (!producerConfig.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) {
this.producerConfig.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class.getName());
} else {
LOG.warn(
"Overwriting the '{}' is not recommended",
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
}
// eagerly ensure that bootstrap servers are set.
if (!this.producerConfig.containsKey(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
throw new IllegalArgumentException(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG
+ " must be supplied in the producer config properties.");
}
this.topicPartitionsMap = new HashMap<>();
}