in flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java [644:741]
private FlinkKafkaProducer(
String defaultTopic,
KeyedSerializationSchema<IN> keyedSchema,
FlinkKafkaPartitioner<IN> customPartitioner,
KafkaSerializationSchema<IN> kafkaSchema,
Properties producerConfig,
FlinkKafkaProducer.Semantic semantic,
int kafkaProducersPoolSize) {
super(
new FlinkKafkaProducer.TransactionStateSerializer(),
new FlinkKafkaProducer.ContextStateSerializer());
this.defaultTopicId = checkNotNull(defaultTopic, "defaultTopic is null");
if (kafkaSchema != null) {
this.keyedSchema = null;
this.kafkaSchema = kafkaSchema;
this.flinkKafkaPartitioner = null;
ClosureCleaner.clean(
this.kafkaSchema, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
if (customPartitioner != null) {
throw new IllegalArgumentException(
"Customer partitioner can only be used when"
+ "using a KeyedSerializationSchema or SerializationSchema.");
}
} else if (keyedSchema != null) {
this.kafkaSchema = null;
this.keyedSchema = keyedSchema;
this.flinkKafkaPartitioner = customPartitioner;
ClosureCleaner.clean(
this.flinkKafkaPartitioner,
ExecutionConfig.ClosureCleanerLevel.RECURSIVE,
true);
ClosureCleaner.clean(
this.keyedSchema, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
} else {
throw new IllegalArgumentException(
"You must provide either a KafkaSerializationSchema or a"
+ "KeyedSerializationSchema.");
}
this.producerConfig = checkNotNull(producerConfig, "producerConfig is null");
this.semantic = checkNotNull(semantic, "semantic is null");
this.kafkaProducersPoolSize = kafkaProducersPoolSize;
checkState(kafkaProducersPoolSize > 0, "kafkaProducersPoolSize must be non empty");
// set the producer configuration properties for kafka record key value serializers.
if (!producerConfig.containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) {
this.producerConfig.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class.getName());
} else {
LOG.warn(
"Overwriting the '{}' is not recommended",
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
}
if (!producerConfig.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) {
this.producerConfig.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class.getName());
} else {
LOG.warn(
"Overwriting the '{}' is not recommended",
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
}
// eagerly ensure that bootstrap servers are set.
if (!this.producerConfig.containsKey(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
throw new IllegalArgumentException(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG
+ " must be supplied in the producer config properties.");
}
if (!producerConfig.containsKey(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG)) {
long timeout = DEFAULT_KAFKA_TRANSACTION_TIMEOUT.toMilliseconds();
checkState(
timeout < Integer.MAX_VALUE && timeout > 0,
"timeout does not fit into 32 bit integer");
this.producerConfig.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, (int) timeout);
LOG.warn(
"Property [{}] not specified. Setting it to {}",
ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,
DEFAULT_KAFKA_TRANSACTION_TIMEOUT);
}
// Enable transactionTimeoutWarnings to avoid silent data loss
// See KAFKA-6119 (affects versions 0.11.0.0 and 0.11.0.1):
// The KafkaProducer may not throw an exception if the transaction failed to commit
if (semantic == FlinkKafkaProducer.Semantic.EXACTLY_ONCE) {
final long transactionTimeout = getTransactionTimeout(producerConfig);
super.setTransactionTimeout(transactionTimeout);
super.enableTransactionTimeoutWarnings(0.8);
}
this.topicPartitionsMap = new HashMap<>();
}