private void exposeConsumerMetrics()

in flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java [325:355]


    private void exposeConsumerMetrics(Consumer<byte[]> consumer) {
        if (sourceConfiguration.isEnableMetrics()) {
            String consumerIdentity = consumer.getConsumerName();
            if (Strings.isNullOrEmpty(consumerIdentity)) {
                consumerIdentity = UUID.randomUUID().toString();
            }

            MetricGroup group =
                    metricGroup
                            .addGroup(PULSAR_CONSUMER_METRIC_NAME)
                            .addGroup(consumer.getTopic())
                            .addGroup(consumerIdentity);
            ConsumerStats stats = consumer.getStats();

            group.gauge(NUM_MSGS_RECEIVED, stats::getNumMsgsReceived);
            group.gauge(NUM_BYTES_RECEIVED, stats::getNumBytesReceived);
            group.gauge(RATE_MSGS_RECEIVED, stats::getRateMsgsReceived);
            group.gauge(RATE_BYTES_RECEIVED, stats::getRateBytesReceived);
            group.gauge(NUM_ACKS_SENT, stats::getNumAcksSent);
            group.gauge(NUM_ACKS_FAILED, stats::getNumAcksFailed);
            group.gauge(NUM_RECEIVE_FAILED, stats::getNumReceiveFailed);
            group.gauge(NUM_BATCH_RECEIVE_FAILED, stats::getNumBatchReceiveFailed);
            group.gauge(TOTAL_MSGS_RECEIVED, stats::getTotalMsgsReceived);
            group.gauge(TOTAL_BYTES_RECEIVED, stats::getTotalBytesReceived);
            group.gauge(TOTAL_RECEIVED_FAILED, stats::getTotalReceivedFailed);
            group.gauge(TOTAL_BATCH_RECEIVED_FAILED, stats::getTotaBatchReceivedFailed);
            group.gauge(TOTAL_ACKS_SENT, stats::getTotalAcksSent);
            group.gauge(TOTAL_ACKS_FAILED, stats::getTotalAcksFailed);
            group.gauge(MSG_NUM_IN_RECEIVER_QUEUE, stats::getMsgNumInReceiverQueue);
        }
    }