private void registerJMXMetrics()

in src/main/java/org/apache/doris/kafka/connector/metrics/DorisConnectMonitor.java [77:136]


    private void registerJMXMetrics(MetricsJmxReporter metricsJmxReporter) {
        MetricRegistry currentMetricRegistry = metricsJmxReporter.getMetricRegistry();

        // Lazily remove all registered metrics from the registry since this can be invoked during
        // partition reassignment
        LOG.debug(
                "Registering metrics existing:{}",
                metricsJmxReporter.getMetricRegistry().getMetrics().keySet());
        metricsJmxReporter.removeMetricsFromRegistry(String.valueOf(taskId));

        try {
            // Offset JMX
            currentMetricRegistry.register(
                    MetricsUtil.constructMetricName(
                            taskId, MetricsUtil.OFFSET_DOMAIN, MetricsUtil.COMMITTED_OFFSET),
                    (Gauge<Long>) committedOffset::get);

            // Total Processed JMX
            currentMetricRegistry.register(
                    MetricsUtil.constructMetricName(
                            taskId,
                            MetricsUtil.TOTAL_PROCESSED_DOMAIN,
                            MetricsUtil.TOTAL_LOAD_COUNT),
                    (Gauge<Long>) totalLoadCount::get);

            currentMetricRegistry.register(
                    MetricsUtil.constructMetricName(
                            taskId,
                            MetricsUtil.TOTAL_PROCESSED_DOMAIN,
                            MetricsUtil.TOTAL_RECORD_COUNT),
                    (Gauge<Long>) totalNumberOfRecord::get);

            currentMetricRegistry.register(
                    MetricsUtil.constructMetricName(
                            taskId,
                            MetricsUtil.TOTAL_PROCESSED_DOMAIN,
                            MetricsUtil.TOTAL_DATA_SIZE),
                    (Gauge<Long>) totalSizeOfData::get);

            // Buffer histogram JMX
            partitionBufferCountHistogram =
                    currentMetricRegistry.histogram(
                            MetricsUtil.constructMetricName(
                                    taskId,
                                    MetricsUtil.BUFFER_DOMAIN,
                                    MetricsUtil.BUFFER_RECORD_COUNT));
            partitionBufferSizeBytesHistogram =
                    currentMetricRegistry.histogram(
                            MetricsUtil.constructMetricName(
                                    taskId,
                                    MetricsUtil.BUFFER_DOMAIN,
                                    MetricsUtil.BUFFER_SIZE_BYTES));
            currentMetricRegistry.register(
                    MetricsUtil.constructMetricName(
                            taskId, MetricsUtil.BUFFER_DOMAIN, MetricsUtil.BUFFER_MEMORY_USAGE),
                    (Gauge<Long>) buffMemoryUsage::get);
        } catch (IllegalArgumentException ex) {
            LOG.warn("Metrics already present:{}", ex.getMessage());
        }
    }