private void computeTargetDataRate()

in flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluator.java [188:241]


    private void computeTargetDataRate(
            JobTopology topology,
            JobVertexID vertex,
            Configuration conf,
            HashMap<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> alreadyEvaluated,
            SortedMap<Instant, CollectedMetrics> metricsHistory,
            Map<ScalingMetric, Double> latestVertexMetrics,
            Map<ScalingMetric, EvaluatedScalingMetric> out) {

        if (topology.isSource(vertex)) {
            double catchUpTargetSec = conf.get(AutoScalerOptions.CATCH_UP_DURATION).toSeconds();

            if (!latestVertexMetrics.containsKey(SOURCE_DATA_RATE)) {
                throw new RuntimeException(
                        "Cannot evaluate metrics without source target rate information");
            }

            out.put(
                    TARGET_DATA_RATE,
                    new EvaluatedScalingMetric(
                            latestVertexMetrics.get(SOURCE_DATA_RATE),
                            getAverage(SOURCE_DATA_RATE, vertex, metricsHistory)));

            double lag = latestVertexMetrics.getOrDefault(LAG, 0.);
            double catchUpInputRate = catchUpTargetSec == 0 ? 0 : lag / catchUpTargetSec;
            if (catchUpInputRate > 0) {
                LOG.debug(
                        "Extra backlog processing input rate for {} is {}",
                        vertex,
                        catchUpInputRate);
            }
            out.put(CATCH_UP_DATA_RATE, EvaluatedScalingMetric.of(catchUpInputRate));
        } else {
            var inputs = topology.getInputs().get(vertex);
            double sumCurrentTargetRate = 0;
            double sumAvgTargetRate = 0;
            double sumCatchUpDataRate = 0;
            for (var inputVertex : inputs) {
                var inputEvaluatedMetrics = alreadyEvaluated.get(inputVertex);
                var inputTargetRate = inputEvaluatedMetrics.get(TARGET_DATA_RATE);
                var outputRateMultiplier =
                        getAverageOutputRatio(new Edge(inputVertex, vertex), metricsHistory);
                sumCurrentTargetRate += inputTargetRate.getCurrent() * outputRateMultiplier;
                sumAvgTargetRate += inputTargetRate.getAverage() * outputRateMultiplier;
                sumCatchUpDataRate +=
                        inputEvaluatedMetrics.get(CATCH_UP_DATA_RATE).getCurrent()
                                * outputRateMultiplier;
            }
            out.put(
                    TARGET_DATA_RATE,
                    new EvaluatedScalingMetric(sumCurrentTargetRate, sumAvgTargetRate));
            out.put(CATCH_UP_DATA_RATE, EvaluatedScalingMetric.of(sumCatchUpDataRate));
        }
    }