public CollectedMetricHistory updateMetrics()

in flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java [78:144]


    public CollectedMetricHistory updateMetrics(
            AbstractFlinkResource<?, ?> cr,
            AutoScalerInfo autoscalerInfo,
            FlinkService flinkService,
            Configuration conf)
            throws Exception {

        var resourceID = ResourceID.fromResource(cr);
        var jobId = JobID.fromHexString(cr.getStatus().getJobStatus().getJobId());

        var now = clock.instant();

        var metricHistory =
                histories.computeIfAbsent(resourceID, (k) -> autoscalerInfo.getMetricHistory());

        // The timestamp of the first metric observation marks the start
        // If we haven't collected any metrics, we are starting now
        var metricCollectionStartTs = metricHistory.isEmpty() ? now : metricHistory.firstKey();

        var jobDetailsInfo = flinkService.getJobDetailsInfo(jobId, conf);
        var jobUpdateTs = getJobUpdateTs(jobDetailsInfo);
        if (jobUpdateTs.isAfter(metricCollectionStartTs)) {
            LOG.info("Job updated at {}. Clearing metrics.", jobUpdateTs);
            autoscalerInfo.clearMetricHistory();
            cleanup(cr);
            metricHistory.clear();
            metricCollectionStartTs = now;
        }
        var topology = getJobTopology(flinkService, conf, autoscalerInfo, jobDetailsInfo);

        // Trim metrics outside the metric window from metrics history
        var metricWindowSize = getMetricWindowSize(conf);
        metricHistory.headMap(now.minus(metricWindowSize)).clear();

        var stableTime = jobUpdateTs.plus(conf.get(AutoScalerOptions.STABILIZATION_INTERVAL));
        if (now.isBefore(stableTime)) {
            // As long as we are stabilizing, collect no metrics at all
            LOG.info("Skipping metric collection during stabilization period until {}", stableTime);
            return new CollectedMetricHistory(topology, Collections.emptySortedMap());
        }

        // The filtered list of metrics we want to query for each vertex
        var filteredVertexMetricNames = queryFilteredMetricNames(flinkService, cr, conf, topology);

        // Aggregated job vertex metrics collected from Flink based on the filtered metric names
        var collectedVertexMetrics =
                queryAllAggregatedMetrics(cr, flinkService, conf, filteredVertexMetricNames);

        // The computed scaling metrics based on the collected aggregated vertex metrics
        var scalingMetrics =
                convertToScalingMetrics(resourceID, collectedVertexMetrics, topology, conf);

        // Add scaling metrics to history if they were computed successfully
        metricHistory.put(now, scalingMetrics);
        autoscalerInfo.updateMetricHistory(metricHistory);

        var collectedMetrics = new CollectedMetricHistory(topology, metricHistory);

        var windowFullTime = metricCollectionStartTs.plus(metricWindowSize);
        collectedMetrics.setFullyCollected(!now.isBefore(windowFullTime));

        if (!collectedMetrics.isFullyCollected()) {
            LOG.info("Metric window not full until {}", windowFullTime);
        }

        return collectedMetrics;
    }