in flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java [90:175]
public CollectedMetricHistory updateMetrics(
Context ctx, AutoScalerStateStore<KEY, Context> stateStore) throws Exception {
var jobKey = ctx.getJobKey();
var conf = ctx.getConfiguration();
var now = clock.instant();
var metricHistory =
histories.computeIfAbsent(
jobKey,
(k) -> {
try {
return stateStore.getCollectedMetrics(ctx);
} catch (Exception exception) {
throw new RuntimeException(
"Get evaluated metrics failed.", exception);
}
});
var jobDetailsInfo =
getJobDetailsInfo(ctx, conf.get(AutoScalerOptions.FLINK_CLIENT_TIMEOUT));
var jobRunningTs = getJobRunningTs(jobDetailsInfo);
// We detect job change compared to our collected metrics by checking against the earliest
// metric timestamp
if (!metricHistory.isEmpty() && jobRunningTs.isAfter(metricHistory.firstKey())) {
LOG.info("Job updated at {}. Clearing metrics.", readable(jobRunningTs));
stateStore.removeCollectedMetrics(ctx);
cleanup(ctx.getJobKey());
metricHistory.clear();
}
var topology = getJobTopology(ctx, stateStore, jobDetailsInfo);
var stableTime = jobRunningTs.plus(conf.get(AutoScalerOptions.STABILIZATION_INTERVAL));
final boolean isStabilizing = now.isBefore(stableTime);
// Calculate timestamp when the metric windows is full
var metricWindowSize = getMetricWindowSize(conf);
var windowFullTime =
getWindowFullTime(metricHistory.tailMap(stableTime), now, metricWindowSize);
// The filtered list of metrics we want to query for each vertex
var filteredVertexMetricNames = queryFilteredMetricNames(ctx, topology, isStabilizing);
// Aggregated job vertex metrics collected from Flink based on the filtered metric names
var collectedVertexMetrics = queryAllAggregatedMetrics(ctx, filteredVertexMetricNames);
var collectedJmMetrics = queryJmMetrics(ctx);
var collectedTmMetrics = queryTmMetrics(ctx);
// The computed scaling metrics based on the collected aggregated vertex metrics
var scalingMetrics =
convertToScalingMetrics(
jobKey,
collectedVertexMetrics,
collectedJmMetrics,
collectedTmMetrics,
topology,
conf);
// Add scaling metrics to history if they were computed successfully
metricHistory.put(now, scalingMetrics);
var collectedMetrics = new CollectedMetricHistory(topology, metricHistory, jobRunningTs);
if (now.isBefore(windowFullTime)) {
if (isStabilizing) {
LOG.info("Stabilizing until {}", readable(stableTime));
} else {
LOG.info(
"Metric window is not full until {}. {} samples collected so far",
readable(windowFullTime),
metricHistory.size());
}
} else {
collectedMetrics.setFullyCollected(true);
// Trim metrics outside the metric window from metrics history
var trimBefore = now.minus(metricWindowSize);
int numDropped = removeMetricsBefore(trimBefore, metricHistory);
LOG.debug(
"Metric window is now full. Dropped {} samples before {}, keeping {}.",
numDropped,
readable(trimBefore),
metricHistory.size());
}
stateStore.storeCollectedMetrics(ctx, metricHistory);
return collectedMetrics;
}