in flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java [462:492]
public static double getAverage(
ScalingMetric metric,
@Nullable JobVertexID jobVertexId,
SortedMap<Instant, CollectedMetrics> metricsHistory,
int minElements) {
double sum = 0;
int n = 0;
boolean anyInfinite = false;
for (var collectedMetrics : metricsHistory.values()) {
var metrics =
jobVertexId != null
? collectedMetrics.getVertexMetrics().get(jobVertexId)
: collectedMetrics.getGlobalMetrics();
double num = metrics.getOrDefault(metric, Double.NaN);
if (Double.isNaN(num)) {
continue;
}
if (Double.isInfinite(num)) {
anyInfinite = true;
continue;
}
sum += num;
n++;
}
if (n == 0) {
return anyInfinite ? Double.POSITIVE_INFINITY : Double.NaN;
}
return n < minElements ? Double.NaN : sum / n;
}