in src/main/java/org/apache/doris/kafka/connector/metrics/DorisConnectMonitor.java [77:136]
private void registerJMXMetrics(MetricsJmxReporter metricsJmxReporter) {
MetricRegistry currentMetricRegistry = metricsJmxReporter.getMetricRegistry();
// Lazily remove all registered metrics from the registry since this can be invoked during
// partition reassignment
LOG.debug(
"Registering metrics existing:{}",
metricsJmxReporter.getMetricRegistry().getMetrics().keySet());
metricsJmxReporter.removeMetricsFromRegistry(String.valueOf(taskId));
try {
// Offset JMX
currentMetricRegistry.register(
MetricsUtil.constructMetricName(
taskId, MetricsUtil.OFFSET_DOMAIN, MetricsUtil.COMMITTED_OFFSET),
(Gauge<Long>) committedOffset::get);
// Total Processed JMX
currentMetricRegistry.register(
MetricsUtil.constructMetricName(
taskId,
MetricsUtil.TOTAL_PROCESSED_DOMAIN,
MetricsUtil.TOTAL_LOAD_COUNT),
(Gauge<Long>) totalLoadCount::get);
currentMetricRegistry.register(
MetricsUtil.constructMetricName(
taskId,
MetricsUtil.TOTAL_PROCESSED_DOMAIN,
MetricsUtil.TOTAL_RECORD_COUNT),
(Gauge<Long>) totalNumberOfRecord::get);
currentMetricRegistry.register(
MetricsUtil.constructMetricName(
taskId,
MetricsUtil.TOTAL_PROCESSED_DOMAIN,
MetricsUtil.TOTAL_DATA_SIZE),
(Gauge<Long>) totalSizeOfData::get);
// Buffer histogram JMX
partitionBufferCountHistogram =
currentMetricRegistry.histogram(
MetricsUtil.constructMetricName(
taskId,
MetricsUtil.BUFFER_DOMAIN,
MetricsUtil.BUFFER_RECORD_COUNT));
partitionBufferSizeBytesHistogram =
currentMetricRegistry.histogram(
MetricsUtil.constructMetricName(
taskId,
MetricsUtil.BUFFER_DOMAIN,
MetricsUtil.BUFFER_SIZE_BYTES));
currentMetricRegistry.register(
MetricsUtil.constructMetricName(
taskId, MetricsUtil.BUFFER_DOMAIN, MetricsUtil.BUFFER_MEMORY_USAGE),
(Gauge<Long>) buffMemoryUsage::get);
} catch (IllegalArgumentException ex) {
LOG.warn("Metrics already present:{}", ex.getMessage());
}
}