private CollectedMetrics convertToScalingMetrics()

in flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java [229:286]


    private CollectedMetrics convertToScalingMetrics(
            ResourceID resourceID,
            Map<JobVertexID, Map<FlinkMetric, AggregatedMetric>> collectedMetrics,
            JobTopology jobTopology,
            Configuration conf) {

        var out = new HashMap<JobVertexID, Map<ScalingMetric, Double>>();

        var finishedVertices = jobTopology.getFinishedVertices();
        if (!finishedVertices.isEmpty()) {
            collectedMetrics = new HashMap<>(collectedMetrics);
            for (JobVertexID v : finishedVertices) {
                collectedMetrics.put(v, FlinkMetric.FINISHED_METRICS);
            }
        }

        collectedMetrics.forEach(
                (jobVertexID, vertexFlinkMetrics) -> {
                    LOG.debug(
                            "Calculating vertex scaling metrics for {} from {}",
                            jobVertexID,
                            vertexFlinkMetrics);
                    var vertexScalingMetrics = new HashMap<ScalingMetric, Double>();
                    out.put(jobVertexID, vertexScalingMetrics);

                    if (jobTopology.isSource(jobVertexID)) {
                        ScalingMetrics.computeLagMetrics(vertexFlinkMetrics, vertexScalingMetrics);
                    }

                    ScalingMetrics.computeLoadMetrics(
                            jobVertexID, vertexFlinkMetrics, vertexScalingMetrics, conf);

                    double lagGrowthRate =
                            computeLagGrowthRate(
                                    resourceID,
                                    jobVertexID,
                                    vertexScalingMetrics.get(ScalingMetric.LAG));

                    ScalingMetrics.computeDataRateMetrics(
                            jobVertexID,
                            vertexFlinkMetrics,
                            vertexScalingMetrics,
                            jobTopology,
                            lagGrowthRate,
                            conf);

                    vertexScalingMetrics
                            .entrySet()
                            .forEach(e -> e.setValue(ScalingMetrics.roundMetric(e.getValue())));

                    LOG.debug(
                            "Vertex scaling metrics for {}: {}", jobVertexID, vertexScalingMetrics);
                });

        var outputRatios = ScalingMetrics.computeOutputRatios(collectedMetrics, jobTopology);
        LOG.debug("Output ratios: {}", outputRatios);
        return new CollectedMetrics(out, outputRatios);
    }