in flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java [1381:1419]
private FlinkKafkaInternalProducer<byte[], byte[]> initProducer(boolean registerMetrics) {
FlinkKafkaInternalProducer<byte[], byte[]> producer = createProducer();
LOG.info(
"Starting FlinkKafkaInternalProducer ({}/{}) to produce into default topic {}",
getRuntimeContext().getIndexOfThisSubtask() + 1,
getRuntimeContext().getNumberOfParallelSubtasks(),
defaultTopicId);
// register Kafka metrics to Flink accumulators
if (registerMetrics
&& !Boolean.parseBoolean(
producerConfig.getProperty(KEY_DISABLE_METRICS, "false"))) {
Map<MetricName, ? extends Metric> metrics = producer.metrics();
if (metrics == null) {
// MapR's Kafka implementation returns null here.
LOG.info("Producer implementation does not support metrics");
} else {
final MetricGroup kafkaMetricGroup =
getRuntimeContext().getMetricGroup().addGroup("KafkaProducer");
for (Map.Entry<MetricName, ? extends Metric> entry : metrics.entrySet()) {
String name = entry.getKey().name();
Metric metric = entry.getValue();
KafkaMetricMutableWrapper wrapper = previouslyCreatedMetrics.get(name);
if (wrapper != null) {
wrapper.setKafkaMetric(metric);
} else {
// TODO: somehow merge metrics from all active producers?
wrapper = new KafkaMetricMutableWrapper(metric);
previouslyCreatedMetrics.put(name, wrapper);
kafkaMetricGroup.gauge(name, wrapper);
}
}
}
}
return producer;
}