in flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/ProducerRegister.java [338:371]
private void exposeProducerMetrics(Producer<?> producer) {
if (sinkConfiguration.isEnableMetrics()) {
String producerIdentity = producer.getProducerName();
if (Strings.isNullOrEmpty(producerIdentity)) {
// Fallback to use the topic name.
producerIdentity = UUID.randomUUID().toString();
}
MetricGroup group =
metricGroup
.addGroup(PULSAR_PRODUCER_METRIC_NAME)
.addGroup(producer.getTopic())
.addGroup(producerIdentity);
ProducerStats stats = producer.getStats();
group.gauge(NUM_MSGS_SENT, stats::getNumMsgsSent);
group.gauge(NUM_BYTES_SENT, stats::getNumBytesSent);
group.gauge(NUM_SEND_FAILED, stats::getNumSendFailed);
group.gauge(NUM_ACKS_RECEIVED, stats::getNumAcksReceived);
group.gauge(SEND_MSGS_RATE, stats::getSendMsgsRate);
group.gauge(SEND_BYTES_RATE, stats::getSendBytesRate);
group.gauge(SEND_LATENCY_MILLIS_50_PCT, stats::getSendLatencyMillis50pct);
group.gauge(SEND_LATENCY_MILLIS_75_PCT, stats::getSendLatencyMillis75pct);
group.gauge(SEND_LATENCY_MILLIS_95_PCT, stats::getSendLatencyMillis95pct);
group.gauge(SEND_LATENCY_MILLIS_99_PCT, stats::getSendLatencyMillis99pct);
group.gauge(SEND_LATENCY_MILLIS_999_PCT, stats::getSendLatencyMillis999pct);
group.gauge(SEND_LATENCY_MILLIS_MAX, stats::getSendLatencyMillisMax);
group.gauge(TOTAL_MSGS_SENT, stats::getTotalMsgsSent);
group.gauge(TOTAL_BYTES_SENT, stats::getTotalBytesSent);
group.gauge(TOTAL_SEND_FAILED, stats::getTotalSendFailed);
group.gauge(TOTAL_ACKS_RECEIVED, stats::getTotalAcksReceived);
group.gauge(PENDING_QUEUE_SIZE, stats::getPendingQueueSize);
}
}