private FlinkKafkaInternalProducer initProducer()

in flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java [1381:1419]


    private FlinkKafkaInternalProducer<byte[], byte[]> initProducer(boolean registerMetrics) {
        FlinkKafkaInternalProducer<byte[], byte[]> producer = createProducer();

        LOG.info(
                "Starting FlinkKafkaInternalProducer ({}/{}) to produce into default topic {}",
                getRuntimeContext().getIndexOfThisSubtask() + 1,
                getRuntimeContext().getNumberOfParallelSubtasks(),
                defaultTopicId);

        // register Kafka metrics to Flink accumulators
        if (registerMetrics
                && !Boolean.parseBoolean(
                        producerConfig.getProperty(KEY_DISABLE_METRICS, "false"))) {
            Map<MetricName, ? extends Metric> metrics = producer.metrics();

            if (metrics == null) {
                // MapR's Kafka implementation returns null here.
                LOG.info("Producer implementation does not support metrics");
            } else {
                final MetricGroup kafkaMetricGroup =
                        getRuntimeContext().getMetricGroup().addGroup("KafkaProducer");
                for (Map.Entry<MetricName, ? extends Metric> entry : metrics.entrySet()) {
                    String name = entry.getKey().name();
                    Metric metric = entry.getValue();

                    KafkaMetricMutableWrapper wrapper = previouslyCreatedMetrics.get(name);
                    if (wrapper != null) {
                        wrapper.setKafkaMetric(metric);
                    } else {
                        // TODO: somehow merge metrics from all active producers?
                        wrapper = new KafkaMetricMutableWrapper(metric);
                        previouslyCreatedMetrics.put(name, wrapper);
                        kafkaMetricGroup.gauge(name, wrapper);
                    }
                }
            }
        }
        return producer;
    }