fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/metrics/FlinkMetricRegistry.java [101:128]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    private Map<String, String> getVariables(AbstractMetricGroup group) {
        if (group == null) {
            return Collections.emptyMap();
        }
        Map<String, String> variablesMap = new LinkedHashMap<>(getVariables(group.getParent()));
        for (Map.Entry<String, String> variablesEntry : group.getAllVariables().entrySet()) {
            // only if it hasn't contains the variable, put the variable to the map
            if (!variablesMap.containsKey(variablesEntry.getKey())) {
                variablesMap.put(variablesEntry.getKey(), variablesEntry.getValue());
            }
        }
        return variablesMap;
    }

    private void registerMetric(MetricGroup metricGroup, Metric metric, String metricName) {
        switch (metric.getMetricType()) {
            case COUNTER:
                metricGroup.counter(metricName, new FlinkCounter((Counter) metric));
                break;
            case METER:
                metricGroup.meter(metricName, new FlinkMeter((Meter) metric));
                break;
            case GAUGE:
                metricGroup.gauge(metricName, new FlinkGauge<>((Gauge<?>) metric));
                break;
            case HISTOGRAM:
                metricGroup.histogram(metricName, new FlinkHistogram((Histogram) metric));
                break;
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



fluss-lakehouse/fluss-lakehouse-paimon/src/main/java/com/alibaba/fluss/lakehouse/paimon/source/metrics/FlinkMetricRegistry.java [85:112]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    private Map<String, String> getVariables(AbstractMetricGroup group) {
        if (group == null) {
            return Collections.emptyMap();
        }
        Map<String, String> variablesMap = new LinkedHashMap<>(getVariables(group.getParent()));
        for (Map.Entry<String, String> variablesEntry : group.getAllVariables().entrySet()) {
            // only if it hasn't contains the variable, put the variable to the map
            if (!variablesMap.containsKey(variablesEntry.getKey())) {
                variablesMap.put(variablesEntry.getKey(), variablesEntry.getValue());
            }
        }
        return variablesMap;
    }

    private void registerMetric(MetricGroup metricGroup, Metric metric, String metricName) {
        switch (metric.getMetricType()) {
            case COUNTER:
                metricGroup.counter(metricName, new FlinkCounter((Counter) metric));
                break;
            case METER:
                metricGroup.meter(metricName, new FlinkMeter((Meter) metric));
                break;
            case GAUGE:
                metricGroup.gauge(metricName, new FlinkGauge<>((Gauge<?>) metric));
                break;
            case HISTOGRAM:
                metricGroup.histogram(metricName, new FlinkHistogram((Histogram) metric));
                break;
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



