private void computeTargetDataRate()

in flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java [321:373]


    private void computeTargetDataRate(
            JobTopology topology,
            JobVertexID vertex,
            Configuration conf,
            double inputRate,
            HashMap<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> alreadyEvaluated,
            SortedMap<Instant, CollectedMetrics> metricsHistory,
            Map<ScalingMetric, Double> latestVertexMetrics,
            Map<ScalingMetric, EvaluatedScalingMetric> out) {

        if (topology.isSource(vertex)) {
            double catchUpTargetSec = conf.get(AutoScalerOptions.CATCH_UP_DURATION).toSeconds();

            double lagRate = getRate(LAG, vertex, metricsHistory);
            double ingestionDataRate = Math.max(0, inputRate + lagRate);
            if (Double.isNaN(ingestionDataRate)) {
                throw new RuntimeException(
                        "Cannot evaluate metrics without ingestion rate information");
            }

            out.put(TARGET_DATA_RATE, EvaluatedScalingMetric.avg(ingestionDataRate));

            double lag = latestVertexMetrics.getOrDefault(LAG, 0.);
            double catchUpInputRate = catchUpTargetSec == 0 ? 0 : lag / catchUpTargetSec;
            if (catchUpInputRate > 0) {
                LOG.debug(
                        "Extra backlog processing input rate for {} is {}",
                        vertex,
                        catchUpInputRate);
            }
            out.put(CATCH_UP_DATA_RATE, EvaluatedScalingMetric.of(catchUpInputRate));
        } else {
            var inputs = topology.get(vertex).getInputs().keySet();
            double sumAvgTargetRate = 0;
            double sumCatchUpDataRate = 0;
            for (var inputVertex : inputs) {
                var inputEvaluatedMetrics = alreadyEvaluated.get(inputVertex);
                var inputTargetRate = inputEvaluatedMetrics.get(TARGET_DATA_RATE);
                var outputRatio =
                        computeEdgeOutputRatio(inputVertex, vertex, topology, metricsHistory);
                LOG.debug(
                        "Computed output ratio for edge ({} -> {}) : {}",
                        inputVertex,
                        vertex,
                        outputRatio);
                sumAvgTargetRate += inputTargetRate.getAverage() * outputRatio;
                sumCatchUpDataRate +=
                        inputEvaluatedMetrics.get(CATCH_UP_DATA_RATE).getCurrent() * outputRatio;
            }
            out.put(TARGET_DATA_RATE, EvaluatedScalingMetric.avg(sumAvgTargetRate));
            out.put(CATCH_UP_DATA_RATE, EvaluatedScalingMetric.of(sumCatchUpDataRate));
        }
    }