public CollectedMetricHistory updateMetrics()

in flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java [90:175]


    public CollectedMetricHistory updateMetrics(
            Context ctx, AutoScalerStateStore<KEY, Context> stateStore) throws Exception {

        var jobKey = ctx.getJobKey();
        var conf = ctx.getConfiguration();

        var now = clock.instant();

        var metricHistory =
                histories.computeIfAbsent(
                        jobKey,
                        (k) -> {
                            try {
                                return stateStore.getCollectedMetrics(ctx);
                            } catch (Exception exception) {
                                throw new RuntimeException(
                                        "Get evaluated metrics failed.", exception);
                            }
                        });

        var jobDetailsInfo =
                getJobDetailsInfo(ctx, conf.get(AutoScalerOptions.FLINK_CLIENT_TIMEOUT));
        var jobRunningTs = getJobRunningTs(jobDetailsInfo);
        // We detect job change compared to our collected metrics by checking against the earliest
        // metric timestamp
        if (!metricHistory.isEmpty() && jobRunningTs.isAfter(metricHistory.firstKey())) {
            LOG.info("Job updated at {}. Clearing metrics.", readable(jobRunningTs));
            stateStore.removeCollectedMetrics(ctx);
            cleanup(ctx.getJobKey());
            metricHistory.clear();
        }
        var topology = getJobTopology(ctx, stateStore, jobDetailsInfo);
        var stableTime = jobRunningTs.plus(conf.get(AutoScalerOptions.STABILIZATION_INTERVAL));
        final boolean isStabilizing = now.isBefore(stableTime);

        // Calculate timestamp when the metric windows is full
        var metricWindowSize = getMetricWindowSize(conf);
        var windowFullTime =
                getWindowFullTime(metricHistory.tailMap(stableTime), now, metricWindowSize);

        // The filtered list of metrics we want to query for each vertex
        var filteredVertexMetricNames = queryFilteredMetricNames(ctx, topology, isStabilizing);

        // Aggregated job vertex metrics collected from Flink based on the filtered metric names
        var collectedVertexMetrics = queryAllAggregatedMetrics(ctx, filteredVertexMetricNames);

        var collectedJmMetrics = queryJmMetrics(ctx);
        var collectedTmMetrics = queryTmMetrics(ctx);

        // The computed scaling metrics based on the collected aggregated vertex metrics
        var scalingMetrics =
                convertToScalingMetrics(
                        jobKey,
                        collectedVertexMetrics,
                        collectedJmMetrics,
                        collectedTmMetrics,
                        topology,
                        conf);

        // Add scaling metrics to history if they were computed successfully
        metricHistory.put(now, scalingMetrics);

        var collectedMetrics = new CollectedMetricHistory(topology, metricHistory, jobRunningTs);
        if (now.isBefore(windowFullTime)) {
            if (isStabilizing) {
                LOG.info("Stabilizing until {}", readable(stableTime));
            } else {
                LOG.info(
                        "Metric window is not full until {}. {} samples collected so far",
                        readable(windowFullTime),
                        metricHistory.size());
            }
        } else {
            collectedMetrics.setFullyCollected(true);
            // Trim metrics outside the metric window from metrics history
            var trimBefore = now.minus(metricWindowSize);
            int numDropped = removeMetricsBefore(trimBefore, metricHistory);
            LOG.debug(
                    "Metric window is now full. Dropped {} samples before {}, keeping {}.",
                    numDropped,
                    readable(trimBefore),
                    metricHistory.size());
        }
        stateStore.storeCollectedMetrics(ctx, metricHistory);
        return collectedMetrics;
    }