public void open()

in flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java [218:291]


    public void open(Configuration configuration) throws Exception {
        if (schema instanceof KeyedSerializationSchemaWrapper) {
            ((KeyedSerializationSchemaWrapper<IN>) schema)
                    .getSerializationSchema()
                    .open(
                            RuntimeContextInitializationContextAdapters.serializationAdapter(
                                    getRuntimeContext(),
                                    metricGroup -> metricGroup.addGroup("user")));
        }
        producer = getKafkaProducer(this.producerConfig);

        RuntimeContext ctx = getRuntimeContext();

        if (null != flinkKafkaPartitioner) {
            flinkKafkaPartitioner.open(
                    ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks());
        }

        LOG.info(
                "Starting FlinkKafkaProducer ({}/{}) to produce into default topic {}",
                ctx.getIndexOfThisSubtask() + 1,
                ctx.getNumberOfParallelSubtasks(),
                defaultTopicId);

        // register Kafka metrics to Flink accumulators
        if (!Boolean.parseBoolean(producerConfig.getProperty(KEY_DISABLE_METRICS, "false"))) {
            Map<MetricName, ? extends Metric> metrics = this.producer.metrics();

            if (metrics == null) {
                // MapR's Kafka implementation returns null here.
                LOG.info("Producer implementation does not support metrics");
            } else {
                final MetricGroup kafkaMetricGroup =
                        getRuntimeContext().getMetricGroup().addGroup("KafkaProducer");
                for (Map.Entry<MetricName, ? extends Metric> metric : metrics.entrySet()) {
                    kafkaMetricGroup.gauge(
                            metric.getKey().name(), new KafkaMetricWrapper(metric.getValue()));
                }
            }
        }

        if (flushOnCheckpoint
                && !((StreamingRuntimeContext) this.getRuntimeContext()).isCheckpointingEnabled()) {
            LOG.warn(
                    "Flushing on checkpoint is enabled, but checkpointing is not enabled. Disabling flushing.");
            flushOnCheckpoint = false;
        }

        if (logFailuresOnly) {
            callback =
                    new Callback() {
                        @Override
                        public void onCompletion(RecordMetadata metadata, Exception e) {
                            if (e != null) {
                                LOG.error(
                                        "Error while sending record to Kafka: " + e.getMessage(),
                                        e);
                            }
                            acknowledgeMessage();
                        }
                    };
        } else {
            callback =
                    new Callback() {
                        @Override
                        public void onCompletion(RecordMetadata metadata, Exception exception) {
                            if (exception != null && asyncException == null) {
                                asyncException = exception;
                            }
                            acknowledgeMessage();
                        }
                    };
        }
    }