in flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java [229:286]
private CollectedMetrics convertToScalingMetrics(
ResourceID resourceID,
Map<JobVertexID, Map<FlinkMetric, AggregatedMetric>> collectedMetrics,
JobTopology jobTopology,
Configuration conf) {
var out = new HashMap<JobVertexID, Map<ScalingMetric, Double>>();
var finishedVertices = jobTopology.getFinishedVertices();
if (!finishedVertices.isEmpty()) {
collectedMetrics = new HashMap<>(collectedMetrics);
for (JobVertexID v : finishedVertices) {
collectedMetrics.put(v, FlinkMetric.FINISHED_METRICS);
}
}
collectedMetrics.forEach(
(jobVertexID, vertexFlinkMetrics) -> {
LOG.debug(
"Calculating vertex scaling metrics for {} from {}",
jobVertexID,
vertexFlinkMetrics);
var vertexScalingMetrics = new HashMap<ScalingMetric, Double>();
out.put(jobVertexID, vertexScalingMetrics);
if (jobTopology.isSource(jobVertexID)) {
ScalingMetrics.computeLagMetrics(vertexFlinkMetrics, vertexScalingMetrics);
}
ScalingMetrics.computeLoadMetrics(
jobVertexID, vertexFlinkMetrics, vertexScalingMetrics, conf);
double lagGrowthRate =
computeLagGrowthRate(
resourceID,
jobVertexID,
vertexScalingMetrics.get(ScalingMetric.LAG));
ScalingMetrics.computeDataRateMetrics(
jobVertexID,
vertexFlinkMetrics,
vertexScalingMetrics,
jobTopology,
lagGrowthRate,
conf);
vertexScalingMetrics
.entrySet()
.forEach(e -> e.setValue(ScalingMetrics.roundMetric(e.getValue())));
LOG.debug(
"Vertex scaling metrics for {}: {}", jobVertexID, vertexScalingMetrics);
});
var outputRatios = ScalingMetrics.computeOutputRatios(collectedMetrics, jobTopology);
LOG.debug("Output ratios: {}", outputRatios);
return new CollectedMetrics(out, outputRatios);
}