in flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java [313:379]
private CollectedMetrics convertToScalingMetrics(
KEY jobKey,
Map<JobVertexID, Map<FlinkMetric, AggregatedMetric>> collectedMetrics,
Map<FlinkMetric, Metric> collectedJmMetrics,
Map<FlinkMetric, AggregatedMetric> collectedTmMetrics,
JobTopology jobTopology,
Configuration conf) {
var out = new HashMap<JobVertexID, Map<ScalingMetric, Double>>();
var finishedVertices = jobTopology.getFinishedVertices();
if (!finishedVertices.isEmpty()) {
collectedMetrics = new HashMap<>(collectedMetrics);
for (JobVertexID v : finishedVertices) {
collectedMetrics.put(v, FlinkMetric.FINISHED_METRICS);
}
}
collectedMetrics.forEach(
(jobVertexID, vertexFlinkMetrics) -> {
LOG.debug(
"Calculating vertex scaling metrics for {} from {}",
jobVertexID,
vertexFlinkMetrics);
var vertexScalingMetrics = new HashMap<ScalingMetric, Double>();
out.put(jobVertexID, vertexScalingMetrics);
if (jobTopology.isSource(jobVertexID)) {
ScalingMetrics.computeLagMetrics(vertexFlinkMetrics, vertexScalingMetrics);
}
ScalingMetrics.computeLoadMetrics(
jobVertexID,
vertexFlinkMetrics,
vertexScalingMetrics,
jobTopology.get(jobVertexID).getIoMetrics(),
conf);
var metricHistory =
histories.getOrDefault(jobKey, Collections.emptySortedMap());
ScalingMetrics.computeDataRateMetrics(
jobVertexID,
vertexFlinkMetrics,
vertexScalingMetrics,
jobTopology,
conf,
observedTprAvg(
jobVertexID,
metricHistory,
conf.get(
AutoScalerOptions
.OBSERVED_TRUE_PROCESSING_RATE_MIN_OBSERVATIONS)));
vertexScalingMetrics
.entrySet()
.forEach(e -> e.setValue(ScalingMetrics.roundMetric(e.getValue())));
LOG.debug(
"Vertex scaling metrics for {}: {}", jobVertexID, vertexScalingMetrics);
});
var globalMetrics =
ScalingMetrics.computeGlobalMetrics(collectedJmMetrics, collectedTmMetrics, conf);
LOG.debug("Global metrics: {}", globalMetrics);
return new CollectedMetrics(out, globalMetrics);
}