in flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluator.java [188:241]
private void computeTargetDataRate(
JobTopology topology,
JobVertexID vertex,
Configuration conf,
HashMap<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> alreadyEvaluated,
SortedMap<Instant, CollectedMetrics> metricsHistory,
Map<ScalingMetric, Double> latestVertexMetrics,
Map<ScalingMetric, EvaluatedScalingMetric> out) {
if (topology.isSource(vertex)) {
double catchUpTargetSec = conf.get(AutoScalerOptions.CATCH_UP_DURATION).toSeconds();
if (!latestVertexMetrics.containsKey(SOURCE_DATA_RATE)) {
throw new RuntimeException(
"Cannot evaluate metrics without source target rate information");
}
out.put(
TARGET_DATA_RATE,
new EvaluatedScalingMetric(
latestVertexMetrics.get(SOURCE_DATA_RATE),
getAverage(SOURCE_DATA_RATE, vertex, metricsHistory)));
double lag = latestVertexMetrics.getOrDefault(LAG, 0.);
double catchUpInputRate = catchUpTargetSec == 0 ? 0 : lag / catchUpTargetSec;
if (catchUpInputRate > 0) {
LOG.debug(
"Extra backlog processing input rate for {} is {}",
vertex,
catchUpInputRate);
}
out.put(CATCH_UP_DATA_RATE, EvaluatedScalingMetric.of(catchUpInputRate));
} else {
var inputs = topology.getInputs().get(vertex);
double sumCurrentTargetRate = 0;
double sumAvgTargetRate = 0;
double sumCatchUpDataRate = 0;
for (var inputVertex : inputs) {
var inputEvaluatedMetrics = alreadyEvaluated.get(inputVertex);
var inputTargetRate = inputEvaluatedMetrics.get(TARGET_DATA_RATE);
var outputRateMultiplier =
getAverageOutputRatio(new Edge(inputVertex, vertex), metricsHistory);
sumCurrentTargetRate += inputTargetRate.getCurrent() * outputRateMultiplier;
sumAvgTargetRate += inputTargetRate.getAverage() * outputRateMultiplier;
sumCatchUpDataRate +=
inputEvaluatedMetrics.get(CATCH_UP_DATA_RATE).getCurrent()
* outputRateMultiplier;
}
out.put(
TARGET_DATA_RATE,
new EvaluatedScalingMetric(sumCurrentTargetRate, sumAvgTargetRate));
out.put(CATCH_UP_DATA_RATE, EvaluatedScalingMetric.of(sumCatchUpDataRate));
}
}