in flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java [78:144]
public CollectedMetricHistory updateMetrics(
AbstractFlinkResource<?, ?> cr,
AutoScalerInfo autoscalerInfo,
FlinkService flinkService,
Configuration conf)
throws Exception {
var resourceID = ResourceID.fromResource(cr);
var jobId = JobID.fromHexString(cr.getStatus().getJobStatus().getJobId());
var now = clock.instant();
var metricHistory =
histories.computeIfAbsent(resourceID, (k) -> autoscalerInfo.getMetricHistory());
// The timestamp of the first metric observation marks the start
// If we haven't collected any metrics, we are starting now
var metricCollectionStartTs = metricHistory.isEmpty() ? now : metricHistory.firstKey();
var jobDetailsInfo = flinkService.getJobDetailsInfo(jobId, conf);
var jobUpdateTs = getJobUpdateTs(jobDetailsInfo);
if (jobUpdateTs.isAfter(metricCollectionStartTs)) {
LOG.info("Job updated at {}. Clearing metrics.", jobUpdateTs);
autoscalerInfo.clearMetricHistory();
cleanup(cr);
metricHistory.clear();
metricCollectionStartTs = now;
}
var topology = getJobTopology(flinkService, conf, autoscalerInfo, jobDetailsInfo);
// Trim metrics outside the metric window from metrics history
var metricWindowSize = getMetricWindowSize(conf);
metricHistory.headMap(now.minus(metricWindowSize)).clear();
var stableTime = jobUpdateTs.plus(conf.get(AutoScalerOptions.STABILIZATION_INTERVAL));
if (now.isBefore(stableTime)) {
// As long as we are stabilizing, collect no metrics at all
LOG.info("Skipping metric collection during stabilization period until {}", stableTime);
return new CollectedMetricHistory(topology, Collections.emptySortedMap());
}
// The filtered list of metrics we want to query for each vertex
var filteredVertexMetricNames = queryFilteredMetricNames(flinkService, cr, conf, topology);
// Aggregated job vertex metrics collected from Flink based on the filtered metric names
var collectedVertexMetrics =
queryAllAggregatedMetrics(cr, flinkService, conf, filteredVertexMetricNames);
// The computed scaling metrics based on the collected aggregated vertex metrics
var scalingMetrics =
convertToScalingMetrics(resourceID, collectedVertexMetrics, topology, conf);
// Add scaling metrics to history if they were computed successfully
metricHistory.put(now, scalingMetrics);
autoscalerInfo.updateMetricHistory(metricHistory);
var collectedMetrics = new CollectedMetricHistory(topology, metricHistory);
var windowFullTime = metricCollectionStartTs.plus(metricWindowSize);
collectedMetrics.setFullyCollected(!now.isBefore(windowFullTime));
if (!collectedMetrics.isFullyCollected()) {
LOG.info("Metric window not full until {}", windowFullTime);
}
return collectedMetrics;
}