fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/metrics/FlinkSourceReaderMetrics.java [44:91]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
public class FlinkSourceReaderMetrics {

    private static final Logger LOG = LoggerFactory.getLogger(FlinkSourceReaderMetrics.class);

    // Constants
    public static final String FLUSS_METRIC_GROUP = "fluss";
    public static final String READER_METRIC_GROUP = "reader";
    public static final String PARTITION_GROUP = "partition";
    public static final String BUCKET_GROUP = "bucket";
    public static final String CURRENT_OFFSET_METRIC_GAUGE = "currentOffset";

    public static final long INITIAL_OFFSET = -1;
    public static final long UNINITIALIZED = -1;

    // Source reader metric group
    private final SourceReaderMetricGroup sourceReaderMetricGroup;

    // Metric group for registering Fluss specific reader metrics
    private final MetricGroup flussSourceReaderMetricGroup;

    // Map for tracking current consuming offsets
    private final Map<TableBucket, Long> offsets = new HashMap<>();

    // For currentFetchEventTimeLag metric
    private volatile long currentFetchEventTimeLag = UNINITIALIZED;

    public FlinkSourceReaderMetrics(SourceReaderMetricGroup sourceReaderMetricGroup) {
        this.sourceReaderMetricGroup = sourceReaderMetricGroup;
        this.flussSourceReaderMetricGroup =
                sourceReaderMetricGroup.addGroup(FLUSS_METRIC_GROUP).addGroup(READER_METRIC_GROUP);
    }

    public void reportRecordEventTime(long lag) {
        if (currentFetchEventTimeLag == UNINITIALIZED) {
            // Lazily register the currentFetchEventTimeLag
            // Set the lag before registering the metric to avoid metric reporter getting
            // the uninitialized value
            currentFetchEventTimeLag = lag;
            sourceReaderMetricGroup.gauge(
                    MetricNames.CURRENT_FETCH_EVENT_TIME_LAG, () -> currentFetchEventTimeLag);
            return;
        }
        currentFetchEventTimeLag = lag;
    }

    public void registerTableBucket(TableBucket tableBucket) {
        offsets.put(tableBucket, INITIAL_OFFSET);
        registerOffsetMetricsForTableBucket(tableBucket);
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



fluss-lakehouse/fluss-lakehouse-paimon/src/main/java/com/alibaba/fluss/lakehouse/paimon/source/metrics/FlinkSourceReaderMetrics.java [45:92]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
public class FlinkSourceReaderMetrics {

    private static final Logger LOG = LoggerFactory.getLogger(FlinkSourceReaderMetrics.class);

    // Constants
    public static final String FLUSS_METRIC_GROUP = "fluss";
    public static final String READER_METRIC_GROUP = "reader";
    public static final String PARTITION_GROUP = "partition";
    public static final String BUCKET_GROUP = "bucket";
    public static final String CURRENT_OFFSET_METRIC_GAUGE = "currentOffset";

    public static final long INITIAL_OFFSET = -1;
    public static final long UNINITIALIZED = -1;

    // Source reader metric group
    private final SourceReaderMetricGroup sourceReaderMetricGroup;

    // Metric group for registering Fluss specific reader metrics
    private final MetricGroup flussSourceReaderMetricGroup;

    // Map for tracking current consuming offsets
    private final Map<TableBucket, Long> offsets = new HashMap<>();

    // For currentFetchEventTimeLag metric
    private volatile long currentFetchEventTimeLag = UNINITIALIZED;

    public FlinkSourceReaderMetrics(SourceReaderMetricGroup sourceReaderMetricGroup) {
        this.sourceReaderMetricGroup = sourceReaderMetricGroup;
        this.flussSourceReaderMetricGroup =
                sourceReaderMetricGroup.addGroup(FLUSS_METRIC_GROUP).addGroup(READER_METRIC_GROUP);
    }

    public void reportRecordEventTime(long lag) {
        if (currentFetchEventTimeLag == UNINITIALIZED) {
            // Lazily register the currentFetchEventTimeLag
            // Set the lag before registering the metric to avoid metric reporter getting
            // the uninitialized value
            currentFetchEventTimeLag = lag;
            sourceReaderMetricGroup.gauge(
                    MetricNames.CURRENT_FETCH_EVENT_TIME_LAG, () -> currentFetchEventTimeLag);
            return;
        }
        currentFetchEventTimeLag = lag;
    }

    public void registerTableBucket(TableBucket tableBucket) {
        offsets.put(tableBucket, INITIAL_OFFSET);
        registerOffsetMetricsForTableBucket(tableBucket);
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



