in flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java [325:355]
private void exposeConsumerMetrics(Consumer<byte[]> consumer) {
if (sourceConfiguration.isEnableMetrics()) {
String consumerIdentity = consumer.getConsumerName();
if (Strings.isNullOrEmpty(consumerIdentity)) {
consumerIdentity = UUID.randomUUID().toString();
}
MetricGroup group =
metricGroup
.addGroup(PULSAR_CONSUMER_METRIC_NAME)
.addGroup(consumer.getTopic())
.addGroup(consumerIdentity);
ConsumerStats stats = consumer.getStats();
group.gauge(NUM_MSGS_RECEIVED, stats::getNumMsgsReceived);
group.gauge(NUM_BYTES_RECEIVED, stats::getNumBytesReceived);
group.gauge(RATE_MSGS_RECEIVED, stats::getRateMsgsReceived);
group.gauge(RATE_BYTES_RECEIVED, stats::getRateBytesReceived);
group.gauge(NUM_ACKS_SENT, stats::getNumAcksSent);
group.gauge(NUM_ACKS_FAILED, stats::getNumAcksFailed);
group.gauge(NUM_RECEIVE_FAILED, stats::getNumReceiveFailed);
group.gauge(NUM_BATCH_RECEIVE_FAILED, stats::getNumBatchReceiveFailed);
group.gauge(TOTAL_MSGS_RECEIVED, stats::getTotalMsgsReceived);
group.gauge(TOTAL_BYTES_RECEIVED, stats::getTotalBytesReceived);
group.gauge(TOTAL_RECEIVED_FAILED, stats::getTotalReceivedFailed);
group.gauge(TOTAL_BATCH_RECEIVED_FAILED, stats::getTotaBatchReceivedFailed);
group.gauge(TOTAL_ACKS_SENT, stats::getTotalAcksSent);
group.gauge(TOTAL_ACKS_FAILED, stats::getTotalAcksFailed);
group.gauge(MSG_NUM_IN_RECEIVER_QUEUE, stats::getMsgNumInReceiverQueue);
}
}