in flink-autoscaler/src/main/java/org/apache/flink/autoscaler/tuning/MemoryTuning.java [75:217]
public static ConfigChanges tuneTaskManagerMemory(
JobAutoScalerContext<?> context,
EvaluatedMetrics evaluatedMetrics,
JobTopology jobTopology,
Map<JobVertexID, ScalingSummary> scalingSummaries,
AutoScalerEventHandler eventHandler) {
// Please note that this config is the original configuration created from the user spec.
// It does not contain any already applied overrides.
var config = new UnmodifiableConfiguration(context.getConfiguration());
// Gather original memory configuration from the user spec
CommonProcessMemorySpec<TaskExecutorFlinkMemory> memSpecs;
try {
memSpecs = FLINK_MEMORY_UTILS.memoryProcessSpecFromConfig(config);
} catch (IllegalConfigurationException e) {
LOG.warn("Current memory configuration is not valid. Aborting memory tuning.");
return EMPTY_CONFIG;
}
MemorySize specHeapSize = memSpecs.getFlinkMemory().getJvmHeapMemorySize();
MemorySize specManagedSize = memSpecs.getFlinkMemory().getManaged();
MemorySize specNetworkSize = memSpecs.getFlinkMemory().getNetwork();
MemorySize specMetaspaceSize = memSpecs.getJvmMetaspaceSize();
LOG.info(
"Spec memory - heap: {}, managed: {}, network: {}, meta: {}",
specHeapSize.toHumanReadableString(),
specManagedSize.toHumanReadableString(),
specNetworkSize.toHumanReadableString(),
specMetaspaceSize.toHumanReadableString());
MemorySize maxMemoryBySpec = context.getTaskManagerMemory().orElse(MemorySize.ZERO);
if (maxMemoryBySpec.compareTo(MemorySize.ZERO) <= 0) {
LOG.warn("Spec TaskManager memory size could not be determined.");
return EMPTY_CONFIG;
}
MemoryBudget memBudget = new MemoryBudget(maxMemoryBySpec.getBytes());
// Budget the original spec's memory settings which we do not modify
memBudget.budget(memSpecs.getFlinkMemory().getFrameworkOffHeap().getBytes());
memBudget.budget(memSpecs.getFlinkMemory().getTaskOffHeap().getBytes());
memBudget.budget(memSpecs.getJvmOverheadSize().getBytes());
var globalMetrics = evaluatedMetrics.getGlobalMetrics();
// The order matters in case the memory usage is higher than the maximum available memory.
// Managed memory comes last because it can grow arbitrary for RocksDB jobs.
MemorySize newNetworkSize =
adjustNetworkMemory(
jobTopology,
ResourceCheckUtils.computeNewParallelisms(
scalingSummaries, evaluatedMetrics.getVertexMetrics()),
config,
memBudget);
// Assign memory to the METASPACE before the HEAP to ensure all needed memory is provided
// to the METASPACE
MemorySize newMetaspaceSize =
determineNewSize(getUsage(METASPACE_MEMORY_USED, globalMetrics), config, memBudget);
MemorySize newHeapSize =
determineNewSize(getUsage(HEAP_MEMORY_USED, globalMetrics), config, memBudget);
MemorySize newManagedSize =
adjustManagedMemory(
getUsage(MANAGED_MEMORY_USED, globalMetrics),
specManagedSize,
config,
memBudget);
// Rescale heap according to scaling decision after distributing all memory pools
newHeapSize =
MemoryScaling.applyMemoryScaling(
newHeapSize, memBudget, context, scalingSummaries, evaluatedMetrics);
LOG.info(
"Optimized memory sizes: heap: {} managed: {}, network: {}, meta: {}",
newHeapSize.toHumanReadableString(),
newManagedSize.toHumanReadableString(),
newNetworkSize.toHumanReadableString(),
newMetaspaceSize.toHumanReadableString());
// Diff can be negative (memory shrinks) or positive (memory grows)
final long heapDiffBytes = newHeapSize.getBytes() - specHeapSize.getBytes();
final long managedDiffBytes = newManagedSize.getBytes() - specManagedSize.getBytes();
final long networkDiffBytes = newNetworkSize.getBytes() - specNetworkSize.getBytes();
final long flinkMemoryDiffBytes = heapDiffBytes + managedDiffBytes + networkDiffBytes;
// Update total memory according to memory diffs
final MemorySize totalMemory =
new MemorySize(maxMemoryBySpec.getBytes() - memBudget.getRemaining());
if (totalMemory.compareTo(MemorySize.ZERO) <= 0) {
LOG.warn("Invalid total memory configuration: {}", totalMemory);
return EMPTY_CONFIG;
}
// Prepare the tuning config for new configuration values
var tuningConfig = new ConfigChanges();
// Adjust the total container memory
tuningConfig.addOverride(TaskManagerOptions.TOTAL_PROCESS_MEMORY, totalMemory);
// We do not set the framework/task heap memory because those are automatically derived from
// setting the other mandatory memory options for managed memory, network, metaspace and jvm
// overhead. However, we do precise accounting for heap memory above. In contrast to other
// memory pools, there are no fractional variants for heap memory. Setting the absolute heap
// memory options could cause invalid configuration states when users adapt the total amount
// of memory. We also need to take care to remove any user-provided overrides for those.
tuningConfig.addRemoval(TaskManagerOptions.TOTAL_FLINK_MEMORY);
tuningConfig.addRemoval(TaskManagerOptions.TASK_HEAP_MEMORY);
// Set default to zero because we already account for heap via task heap.
tuningConfig.addOverride(TaskManagerOptions.FRAMEWORK_HEAP_MEMORY, MemorySize.ZERO);
MemorySize flinkMemorySize =
new MemorySize(
memSpecs.getTotalFlinkMemorySize().getBytes() + flinkMemoryDiffBytes);
// All memory options which can be configured via fractions need to be re-calculated.
tuningConfig.addOverride(
TaskManagerOptions.MANAGED_MEMORY_FRACTION,
getFraction(newManagedSize, flinkMemorySize));
tuningConfig.addRemoval(TaskManagerOptions.MANAGED_MEMORY_SIZE);
tuningConfig.addOverride(TaskManagerOptions.NETWORK_MEMORY_MIN, newNetworkSize);
tuningConfig.addOverride(TaskManagerOptions.NETWORK_MEMORY_MAX, newNetworkSize);
tuningConfig.addOverride(
TaskManagerOptions.JVM_OVERHEAD_FRACTION,
getFraction(memSpecs.getJvmOverheadSize(), totalMemory));
tuningConfig.addOverride(TaskManagerOptions.JVM_METASPACE, newMetaspaceSize);
eventHandler.handleEvent(
context,
AutoScalerEventHandler.Type.Normal,
"Configuration recommendation",
String.format(
"Memory tuning recommends the following configuration (automatic tuning is %s):\n%s",
config.get(AutoScalerOptions.MEMORY_TUNING_ENABLED)
? "enabled"
: "disabled",
formatConfig(tuningConfig)),
"MemoryTuning",
config.get(AutoScalerOptions.SCALING_EVENT_INTERVAL));
if (!context.getConfiguration().get(AutoScalerOptions.MEMORY_TUNING_ENABLED)) {
return EMPTY_CONFIG;
}
return tuningConfig;
}