public void open()

in flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java [808:855]


    public void open(Configuration configuration) throws Exception {
        if (logFailuresOnly) {
            callback =
                    new Callback() {
                        @Override
                        public void onCompletion(RecordMetadata metadata, Exception e) {
                            if (e != null) {
                                LOG.error(
                                        "Error while sending record to Kafka: " + e.getMessage(),
                                        e);
                            }
                            acknowledgeMessage();
                        }
                    };
        } else {
            callback =
                    new Callback() {
                        @Override
                        public void onCompletion(RecordMetadata metadata, Exception exception) {
                            if (exception != null && asyncException == null) {
                                asyncException = exception;
                            }
                            acknowledgeMessage();
                        }
                    };
        }

        RuntimeContext ctx = getRuntimeContext();

        if (flinkKafkaPartitioner != null) {
            flinkKafkaPartitioner.open(
                    ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks());
        }

        if (kafkaSchema instanceof KafkaContextAware) {
            KafkaContextAware<IN> contextAwareSchema = (KafkaContextAware<IN>) kafkaSchema;
            contextAwareSchema.setParallelInstanceId(ctx.getIndexOfThisSubtask());
            contextAwareSchema.setNumParallelInstances(ctx.getNumberOfParallelSubtasks());
        }

        if (kafkaSchema != null) {
            kafkaSchema.open(
                    RuntimeContextInitializationContextAdapters.serializationAdapter(
                            getRuntimeContext(), metricGroup -> metricGroup.addGroup("user")));
        }

        super.open(configuration);
    }