private CollectedMetrics convertToScalingMetrics()

in flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java [313:379]


    private CollectedMetrics convertToScalingMetrics(
            KEY jobKey,
            Map<JobVertexID, Map<FlinkMetric, AggregatedMetric>> collectedMetrics,
            Map<FlinkMetric, Metric> collectedJmMetrics,
            Map<FlinkMetric, AggregatedMetric> collectedTmMetrics,
            JobTopology jobTopology,
            Configuration conf) {

        var out = new HashMap<JobVertexID, Map<ScalingMetric, Double>>();

        var finishedVertices = jobTopology.getFinishedVertices();
        if (!finishedVertices.isEmpty()) {
            collectedMetrics = new HashMap<>(collectedMetrics);
            for (JobVertexID v : finishedVertices) {
                collectedMetrics.put(v, FlinkMetric.FINISHED_METRICS);
            }
        }

        collectedMetrics.forEach(
                (jobVertexID, vertexFlinkMetrics) -> {
                    LOG.debug(
                            "Calculating vertex scaling metrics for {} from {}",
                            jobVertexID,
                            vertexFlinkMetrics);
                    var vertexScalingMetrics = new HashMap<ScalingMetric, Double>();
                    out.put(jobVertexID, vertexScalingMetrics);

                    if (jobTopology.isSource(jobVertexID)) {
                        ScalingMetrics.computeLagMetrics(vertexFlinkMetrics, vertexScalingMetrics);
                    }

                    ScalingMetrics.computeLoadMetrics(
                            jobVertexID,
                            vertexFlinkMetrics,
                            vertexScalingMetrics,
                            jobTopology.get(jobVertexID).getIoMetrics(),
                            conf);

                    var metricHistory =
                            histories.getOrDefault(jobKey, Collections.emptySortedMap());

                    ScalingMetrics.computeDataRateMetrics(
                            jobVertexID,
                            vertexFlinkMetrics,
                            vertexScalingMetrics,
                            jobTopology,
                            conf,
                            observedTprAvg(
                                    jobVertexID,
                                    metricHistory,
                                    conf.get(
                                            AutoScalerOptions
                                                    .OBSERVED_TRUE_PROCESSING_RATE_MIN_OBSERVATIONS)));
                    vertexScalingMetrics
                            .entrySet()
                            .forEach(e -> e.setValue(ScalingMetrics.roundMetric(e.getValue())));

                    LOG.debug(
                            "Vertex scaling metrics for {}: {}", jobVertexID, vertexScalingMetrics);
                });

        var globalMetrics =
                ScalingMetrics.computeGlobalMetrics(collectedJmMetrics, collectedTmMetrics, conf);
        LOG.debug("Global metrics: {}", globalMetrics);

        return new CollectedMetrics(out, globalMetrics);
    }