private static double computeEdgeOutPerSecond()

in flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java [178:226]


    private static double computeEdgeOutPerSecond(
            JobTopology topology,
            Map<JobVertexID, Map<FlinkMetric, AggregatedMetric>> flinkMetrics,
            JobVertexID from,
            JobVertexID to) {
        var toMetrics = flinkMetrics.get(to);

        var toVertexInputs = topology.getInputs().get(to);
        // Case 1: Downstream vertex has a single input (from) so we can use the most reliable num
        // records in
        if (toVertexInputs.size() == 1) {
            LOG.debug(
                    "Computing edge ({}, {}) data rate for single input downstream task", from, to);
            return getNumRecordsInPerSecond(toMetrics, to, false);
        }

        // Case 2: Downstream vertex has only inputs from upstream vertices which don't have other
        // outputs
        double numRecordsOutFromUpstreamInputs = 0;
        for (JobVertexID input : toVertexInputs) {
            if (input.equals(from)) {
                // Exclude source edge because we only want to consider other input edges
                continue;
            }
            if (topology.getOutputs().get(input).size() == 1) {
                numRecordsOutFromUpstreamInputs +=
                        getNumRecordsOutPerSecond(flinkMetrics.get(input), input);
            } else {
                // Output vertex has multiple outputs, cannot use this information...
                numRecordsOutFromUpstreamInputs = Double.NaN;
                break;
            }
        }
        if (!Double.isNaN(numRecordsOutFromUpstreamInputs)) {
            LOG.debug(
                    "Computing edge ({}, {}) data rate by subtracting upstream input rates",
                    from,
                    to);
            return getNumRecordsInPerSecond(toMetrics, to, false) - numRecordsOutFromUpstreamInputs;
        }
        var fromMetrics = flinkMetrics.get(from);

        // Case 3: We fall back simply to num records out, this is the least reliable
        LOG.debug(
                "Computing edge ({}, {}) data rate by falling back to from num records out",
                from,
                to);
        return getNumRecordsOutPerSecond(fromMetrics, from);
    }