in flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java [334:409]
protected static boolean scalingWouldExceedResourceQuota(
Configuration tunedConfig,
JobTopology jobTopology,
Map<JobVertexID, ScalingSummary> scalingSummaries,
JobAutoScalerContext<?> ctx) {
if (jobTopology == null || jobTopology.getSlotSharingGroupMapping().isEmpty()) {
return false;
}
var cpuQuota = tunedConfig.getOptional(AutoScalerOptions.CPU_QUOTA);
var memoryQuota = tunedConfig.getOptional(AutoScalerOptions.MEMORY_QUOTA);
var tmMemory = MemoryTuning.getTotalMemory(tunedConfig, ctx);
var tmCpu = ctx.getTaskManagerCpu().orElse(0.);
if (cpuQuota.isPresent() || memoryQuota.isPresent()) {
var currentSlotSharingGroupMaxParallelisms = new HashMap<SlotSharingGroupId, Integer>();
var newSlotSharingGroupMaxParallelisms = new HashMap<SlotSharingGroupId, Integer>();
for (var e : jobTopology.getSlotSharingGroupMapping().entrySet()) {
int currentMaxParallelism =
e.getValue().stream()
.filter(scalingSummaries::containsKey)
.mapToInt(v -> scalingSummaries.get(v).getCurrentParallelism())
.max()
.orElse(0);
currentSlotSharingGroupMaxParallelisms.put(e.getKey(), currentMaxParallelism);
int newMaxParallelism =
e.getValue().stream()
.filter(scalingSummaries::containsKey)
.mapToInt(v -> scalingSummaries.get(v).getNewParallelism())
.max()
.orElse(0);
newSlotSharingGroupMaxParallelisms.put(e.getKey(), newMaxParallelism);
}
var numSlotsPerTm = tunedConfig.get(TaskManagerOptions.NUM_TASK_SLOTS);
var currentTotalSlots =
currentSlotSharingGroupMaxParallelisms.values().stream()
.mapToInt(Integer::intValue)
.sum();
var currentNumTms = currentTotalSlots / numSlotsPerTm;
var newTotalSlots =
newSlotSharingGroupMaxParallelisms.values().stream()
.mapToInt(Integer::intValue)
.sum();
var newNumTms = newTotalSlots / numSlotsPerTm;
if (newNumTms <= currentNumTms) {
LOG.debug(
"Skipping quota check due to new resource allocation is less or equals than the current");
return false;
}
if (cpuQuota.isPresent()) {
LOG.debug("CPU resource quota is {}, checking limits", cpuQuota.get());
double totalCPU = tmCpu * newNumTms;
if (totalCPU > cpuQuota.get()) {
LOG.info("CPU resource quota reached with value: {}", totalCPU);
return true;
}
}
if (memoryQuota.isPresent()) {
LOG.debug("Memory resource quota is {}, checking limits", memoryQuota.get());
long totalMemory = tmMemory.getBytes() * newNumTms;
if (totalMemory > memoryQuota.get().getBytes()) {
LOG.info(
"Memory resource quota reached with value: {}",
new MemorySize(totalMemory));
return true;
}
}
}
return false;
}