in flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java [178:226]
private static double computeEdgeOutPerSecond(
JobTopology topology,
Map<JobVertexID, Map<FlinkMetric, AggregatedMetric>> flinkMetrics,
JobVertexID from,
JobVertexID to) {
var toMetrics = flinkMetrics.get(to);
var toVertexInputs = topology.getInputs().get(to);
// Case 1: Downstream vertex has a single input (from) so we can use the most reliable num
// records in
if (toVertexInputs.size() == 1) {
LOG.debug(
"Computing edge ({}, {}) data rate for single input downstream task", from, to);
return getNumRecordsInPerSecond(toMetrics, to, false);
}
// Case 2: Downstream vertex has only inputs from upstream vertices which don't have other
// outputs
double numRecordsOutFromUpstreamInputs = 0;
for (JobVertexID input : toVertexInputs) {
if (input.equals(from)) {
// Exclude source edge because we only want to consider other input edges
continue;
}
if (topology.getOutputs().get(input).size() == 1) {
numRecordsOutFromUpstreamInputs +=
getNumRecordsOutPerSecond(flinkMetrics.get(input), input);
} else {
// Output vertex has multiple outputs, cannot use this information...
numRecordsOutFromUpstreamInputs = Double.NaN;
break;
}
}
if (!Double.isNaN(numRecordsOutFromUpstreamInputs)) {
LOG.debug(
"Computing edge ({}, {}) data rate by subtracting upstream input rates",
from,
to);
return getNumRecordsInPerSecond(toMetrics, to, false) - numRecordsOutFromUpstreamInputs;
}
var fromMetrics = flinkMetrics.get(from);
// Case 3: We fall back simply to num records out, this is the least reliable
LOG.debug(
"Computing edge ({}, {}) data rate by falling back to from num records out",
from,
to);
return getNumRecordsOutPerSecond(fromMetrics, from);
}