private FlinkKafkaProducer()

in flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java [644:741]


    private FlinkKafkaProducer(
            String defaultTopic,
            KeyedSerializationSchema<IN> keyedSchema,
            FlinkKafkaPartitioner<IN> customPartitioner,
            KafkaSerializationSchema<IN> kafkaSchema,
            Properties producerConfig,
            FlinkKafkaProducer.Semantic semantic,
            int kafkaProducersPoolSize) {
        super(
                new FlinkKafkaProducer.TransactionStateSerializer(),
                new FlinkKafkaProducer.ContextStateSerializer());

        this.defaultTopicId = checkNotNull(defaultTopic, "defaultTopic is null");

        if (kafkaSchema != null) {
            this.keyedSchema = null;
            this.kafkaSchema = kafkaSchema;
            this.flinkKafkaPartitioner = null;
            ClosureCleaner.clean(
                    this.kafkaSchema, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);

            if (customPartitioner != null) {
                throw new IllegalArgumentException(
                        "Customer partitioner can only be used when"
                                + "using a KeyedSerializationSchema or SerializationSchema.");
            }
        } else if (keyedSchema != null) {
            this.kafkaSchema = null;
            this.keyedSchema = keyedSchema;
            this.flinkKafkaPartitioner = customPartitioner;
            ClosureCleaner.clean(
                    this.flinkKafkaPartitioner,
                    ExecutionConfig.ClosureCleanerLevel.RECURSIVE,
                    true);
            ClosureCleaner.clean(
                    this.keyedSchema, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
        } else {
            throw new IllegalArgumentException(
                    "You must provide either a KafkaSerializationSchema or a"
                            + "KeyedSerializationSchema.");
        }

        this.producerConfig = checkNotNull(producerConfig, "producerConfig is null");
        this.semantic = checkNotNull(semantic, "semantic is null");
        this.kafkaProducersPoolSize = kafkaProducersPoolSize;
        checkState(kafkaProducersPoolSize > 0, "kafkaProducersPoolSize must be non empty");

        // set the producer configuration properties for kafka record key value serializers.
        if (!producerConfig.containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) {
            this.producerConfig.put(
                    ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                    ByteArraySerializer.class.getName());
        } else {
            LOG.warn(
                    "Overwriting the '{}' is not recommended",
                    ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
        }

        if (!producerConfig.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) {
            this.producerConfig.put(
                    ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                    ByteArraySerializer.class.getName());
        } else {
            LOG.warn(
                    "Overwriting the '{}' is not recommended",
                    ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
        }

        // eagerly ensure that bootstrap servers are set.
        if (!this.producerConfig.containsKey(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
            throw new IllegalArgumentException(
                    ProducerConfig.BOOTSTRAP_SERVERS_CONFIG
                            + " must be supplied in the producer config properties.");
        }

        if (!producerConfig.containsKey(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG)) {
            long timeout = DEFAULT_KAFKA_TRANSACTION_TIMEOUT.toMilliseconds();
            checkState(
                    timeout < Integer.MAX_VALUE && timeout > 0,
                    "timeout does not fit into 32 bit integer");
            this.producerConfig.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, (int) timeout);
            LOG.warn(
                    "Property [{}] not specified. Setting it to {}",
                    ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,
                    DEFAULT_KAFKA_TRANSACTION_TIMEOUT);
        }

        // Enable transactionTimeoutWarnings to avoid silent data loss
        // See KAFKA-6119 (affects versions 0.11.0.0 and 0.11.0.1):
        // The KafkaProducer may not throw an exception if the transaction failed to commit
        if (semantic == FlinkKafkaProducer.Semantic.EXACTLY_ONCE) {
            final long transactionTimeout = getTransactionTimeout(producerConfig);
            super.setTransactionTimeout(transactionTimeout);
            super.enableTransactionTimeoutWarnings(0.8);
        }

        this.topicPartitionsMap = new HashMap<>();
    }