fluss-lakehouse/fluss-lakehouse-paimon/src/main/java/com/alibaba/fluss/lakehouse/paimon/source/metrics/FlinkSourceReaderMetrics.java [112:123]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    private void registerOffsetMetricsForTableBucket(TableBucket tableBucket) {
        final MetricGroup metricGroup =
                tableBucket.getPartitionId() == null
                        ? this.flussSourceReaderMetricGroup
                        : this.flussSourceReaderMetricGroup.addGroup(
                                PARTITION_GROUP, String.valueOf(tableBucket.getPartitionId()));
        final MetricGroup bucketGroup =
                metricGroup.addGroup(BUCKET_GROUP, String.valueOf(tableBucket.getBucket()));
        bucketGroup.gauge(
                CURRENT_OFFSET_METRIC_GAUGE,
                () -> offsets.getOrDefault(tableBucket, INITIAL_OFFSET));
    }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/metrics/FlinkSourceReaderMetrics.java [106:117]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    private void registerOffsetMetricsForTableBucket(TableBucket tableBucket) {
        final MetricGroup metricGroup =
                tableBucket.getPartitionId() == null
                        ? this.flussSourceReaderMetricGroup
                        : this.flussSourceReaderMetricGroup.addGroup(
                                PARTITION_GROUP, String.valueOf(tableBucket.getPartitionId()));
        final MetricGroup bucketGroup =
                metricGroup.addGroup(BUCKET_GROUP, String.valueOf(tableBucket.getBucket()));
        bucketGroup.gauge(
                CURRENT_OFFSET_METRIC_GAUGE,
                () -> offsets.getOrDefault(tableBucket, INITIAL_OFFSET));
    }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



