private void exposeProducerMetrics()

in flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/ProducerRegister.java [338:371]


    private void exposeProducerMetrics(Producer<?> producer) {
        if (sinkConfiguration.isEnableMetrics()) {
            String producerIdentity = producer.getProducerName();
            if (Strings.isNullOrEmpty(producerIdentity)) {
                // Fallback to use the topic name.
                producerIdentity = UUID.randomUUID().toString();
            }

            MetricGroup group =
                    metricGroup
                            .addGroup(PULSAR_PRODUCER_METRIC_NAME)
                            .addGroup(producer.getTopic())
                            .addGroup(producerIdentity);
            ProducerStats stats = producer.getStats();

            group.gauge(NUM_MSGS_SENT, stats::getNumMsgsSent);
            group.gauge(NUM_BYTES_SENT, stats::getNumBytesSent);
            group.gauge(NUM_SEND_FAILED, stats::getNumSendFailed);
            group.gauge(NUM_ACKS_RECEIVED, stats::getNumAcksReceived);
            group.gauge(SEND_MSGS_RATE, stats::getSendMsgsRate);
            group.gauge(SEND_BYTES_RATE, stats::getSendBytesRate);
            group.gauge(SEND_LATENCY_MILLIS_50_PCT, stats::getSendLatencyMillis50pct);
            group.gauge(SEND_LATENCY_MILLIS_75_PCT, stats::getSendLatencyMillis75pct);
            group.gauge(SEND_LATENCY_MILLIS_95_PCT, stats::getSendLatencyMillis95pct);
            group.gauge(SEND_LATENCY_MILLIS_99_PCT, stats::getSendLatencyMillis99pct);
            group.gauge(SEND_LATENCY_MILLIS_999_PCT, stats::getSendLatencyMillis999pct);
            group.gauge(SEND_LATENCY_MILLIS_MAX, stats::getSendLatencyMillisMax);
            group.gauge(TOTAL_MSGS_SENT, stats::getTotalMsgsSent);
            group.gauge(TOTAL_BYTES_SENT, stats::getTotalBytesSent);
            group.gauge(TOTAL_SEND_FAILED, stats::getTotalSendFailed);
            group.gauge(TOTAL_ACKS_RECEIVED, stats::getTotalAcksReceived);
            group.gauge(PENDING_QUEUE_SIZE, stats::getPendingQueueSize);
        }
    }