in flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java [143:191]
public static Map<ScalingMetric, Double> computeGlobalMetrics(
Map<FlinkMetric, Metric> collectedJmMetrics,
Map<FlinkMetric, AggregatedMetric> collectedTmMetrics,
Configuration conf) {
if (collectedTmMetrics == null) {
return null;
}
var out = new HashMap<ScalingMetric, Double>();
try {
var numTotalTaskSlots =
Double.valueOf(
collectedJmMetrics.get(FlinkMetric.NUM_TASK_SLOTS_TOTAL).getValue());
var numTaskSlotsAvailable =
Double.valueOf(
collectedJmMetrics
.get(FlinkMetric.NUM_TASK_SLOTS_AVAILABLE)
.getValue());
out.put(ScalingMetric.NUM_TASK_SLOTS_USED, numTotalTaskSlots - numTaskSlotsAvailable);
} catch (Exception e) {
LOG.debug("Slot metrics and registered task managers not available");
}
var gcTime = collectedTmMetrics.get(FlinkMetric.TOTAL_GC_TIME_PER_SEC);
if (gcTime != null) {
out.put(ScalingMetric.GC_PRESSURE, gcTime.getMax() / 1000);
}
var heapMax = collectedTmMetrics.get(FlinkMetric.HEAP_MEMORY_MAX);
var heapUsed = collectedTmMetrics.get(FlinkMetric.HEAP_MEMORY_USED);
if (heapMax != null && heapUsed != null) {
out.put(ScalingMetric.HEAP_MEMORY_USED, heapUsed.getMax());
out.put(ScalingMetric.HEAP_MAX_USAGE_RATIO, heapUsed.getMax() / heapMax.getMax());
}
var managedMemory = collectedTmMetrics.get(FlinkMetric.MANAGED_MEMORY_USED);
if (managedMemory != null) {
out.put(ScalingMetric.MANAGED_MEMORY_USED, managedMemory.getMax());
}
var metaspaceMemory = collectedTmMetrics.get(FlinkMetric.METASPACE_MEMORY_USED);
if (metaspaceMemory != null) {
out.put(ScalingMetric.METASPACE_MEMORY_USED, metaspaceMemory.getMax());
}
return out;
}