fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/metrics/FlinkSourceReaderMetrics.java [100:126]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    public void recordCurrentOffset(TableBucket tb, long offset) {
        checkTableBucketTracked(tb);
        offsets.put(tb, offset);
    }

    // -------- Helper functions --------
    private void registerOffsetMetricsForTableBucket(TableBucket tableBucket) {
        final MetricGroup metricGroup =
                tableBucket.getPartitionId() == null
                        ? this.flussSourceReaderMetricGroup
                        : this.flussSourceReaderMetricGroup.addGroup(
                                PARTITION_GROUP, String.valueOf(tableBucket.getPartitionId()));
        final MetricGroup bucketGroup =
                metricGroup.addGroup(BUCKET_GROUP, String.valueOf(tableBucket.getBucket()));
        bucketGroup.gauge(
                CURRENT_OFFSET_METRIC_GAUGE,
                () -> offsets.getOrDefault(tableBucket, INITIAL_OFFSET));
    }

    private void checkTableBucketTracked(TableBucket tableBucket) {
        if (!offsets.containsKey(tableBucket)) {
            LOG.warn("Offset metrics of TableBucket {} is not tracked", tableBucket);
        }
    }

    public SourceReaderMetricGroup getSourceReaderMetricGroup() {
        return sourceReaderMetricGroup;
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



fluss-lakehouse/fluss-lakehouse-paimon/src/main/java/com/alibaba/fluss/lakehouse/paimon/source/metrics/FlinkSourceReaderMetrics.java [106:132]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    public void recordCurrentOffset(TableBucket tb, long offset) {
        checkTableBucketTracked(tb);
        offsets.put(tb, offset);
    }

    // -------- Helper functions --------
    private void registerOffsetMetricsForTableBucket(TableBucket tableBucket) {
        final MetricGroup metricGroup =
                tableBucket.getPartitionId() == null
                        ? this.flussSourceReaderMetricGroup
                        : this.flussSourceReaderMetricGroup.addGroup(
                                PARTITION_GROUP, String.valueOf(tableBucket.getPartitionId()));
        final MetricGroup bucketGroup =
                metricGroup.addGroup(BUCKET_GROUP, String.valueOf(tableBucket.getBucket()));
        bucketGroup.gauge(
                CURRENT_OFFSET_METRIC_GAUGE,
                () -> offsets.getOrDefault(tableBucket, INITIAL_OFFSET));
    }

    private void checkTableBucketTracked(TableBucket tableBucket) {
        if (!offsets.containsKey(tableBucket)) {
            LOG.warn("Offset metrics of TableBucket {} is not tracked", tableBucket);
        }
    }

    public SourceReaderMetricGroup getSourceReaderMetricGroup() {
        return sourceReaderMetricGroup;
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



