public FlinkKafkaProducerBase()

in flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java [136:182]


    public FlinkKafkaProducerBase(
            String defaultTopicId,
            KeyedSerializationSchema<IN> serializationSchema,
            Properties producerConfig,
            FlinkKafkaPartitioner<IN> customPartitioner) {
        requireNonNull(defaultTopicId, "TopicID not set");
        requireNonNull(serializationSchema, "serializationSchema not set");
        requireNonNull(producerConfig, "producerConfig not set");
        ClosureCleaner.clean(
                customPartitioner, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
        ClosureCleaner.ensureSerializable(serializationSchema);

        this.defaultTopicId = defaultTopicId;
        this.schema = serializationSchema;
        this.producerConfig = producerConfig;
        this.flinkKafkaPartitioner = customPartitioner;

        // set the producer configuration properties for kafka record key value serializers.
        if (!producerConfig.containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) {
            this.producerConfig.put(
                    ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                    ByteArraySerializer.class.getName());
        } else {
            LOG.warn(
                    "Overwriting the '{}' is not recommended",
                    ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
        }

        if (!producerConfig.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) {
            this.producerConfig.put(
                    ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                    ByteArraySerializer.class.getName());
        } else {
            LOG.warn(
                    "Overwriting the '{}' is not recommended",
                    ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
        }

        // eagerly ensure that bootstrap servers are set.
        if (!this.producerConfig.containsKey(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
            throw new IllegalArgumentException(
                    ProducerConfig.BOOTSTRAP_SERVERS_CONFIG
                            + " must be supplied in the producer config properties.");
        }

        this.topicPartitionsMap = new HashMap<>();
    }