in flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java [321:373]
private void computeTargetDataRate(
JobTopology topology,
JobVertexID vertex,
Configuration conf,
double inputRate,
HashMap<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> alreadyEvaluated,
SortedMap<Instant, CollectedMetrics> metricsHistory,
Map<ScalingMetric, Double> latestVertexMetrics,
Map<ScalingMetric, EvaluatedScalingMetric> out) {
if (topology.isSource(vertex)) {
double catchUpTargetSec = conf.get(AutoScalerOptions.CATCH_UP_DURATION).toSeconds();
double lagRate = getRate(LAG, vertex, metricsHistory);
double ingestionDataRate = Math.max(0, inputRate + lagRate);
if (Double.isNaN(ingestionDataRate)) {
throw new RuntimeException(
"Cannot evaluate metrics without ingestion rate information");
}
out.put(TARGET_DATA_RATE, EvaluatedScalingMetric.avg(ingestionDataRate));
double lag = latestVertexMetrics.getOrDefault(LAG, 0.);
double catchUpInputRate = catchUpTargetSec == 0 ? 0 : lag / catchUpTargetSec;
if (catchUpInputRate > 0) {
LOG.debug(
"Extra backlog processing input rate for {} is {}",
vertex,
catchUpInputRate);
}
out.put(CATCH_UP_DATA_RATE, EvaluatedScalingMetric.of(catchUpInputRate));
} else {
var inputs = topology.get(vertex).getInputs().keySet();
double sumAvgTargetRate = 0;
double sumCatchUpDataRate = 0;
for (var inputVertex : inputs) {
var inputEvaluatedMetrics = alreadyEvaluated.get(inputVertex);
var inputTargetRate = inputEvaluatedMetrics.get(TARGET_DATA_RATE);
var outputRatio =
computeEdgeOutputRatio(inputVertex, vertex, topology, metricsHistory);
LOG.debug(
"Computed output ratio for edge ({} -> {}) : {}",
inputVertex,
vertex,
outputRatio);
sumAvgTargetRate += inputTargetRate.getAverage() * outputRatio;
sumCatchUpDataRate +=
inputEvaluatedMetrics.get(CATCH_UP_DATA_RATE).getCurrent() * outputRatio;
}
out.put(TARGET_DATA_RATE, EvaluatedScalingMetric.avg(sumAvgTargetRate));
out.put(CATCH_UP_DATA_RATE, EvaluatedScalingMetric.of(sumCatchUpDataRate));
}
}