public static ConfigChanges tuneTaskManagerMemory()

in flink-autoscaler/src/main/java/org/apache/flink/autoscaler/tuning/MemoryTuning.java [75:217]


    public static ConfigChanges tuneTaskManagerMemory(
            JobAutoScalerContext<?> context,
            EvaluatedMetrics evaluatedMetrics,
            JobTopology jobTopology,
            Map<JobVertexID, ScalingSummary> scalingSummaries,
            AutoScalerEventHandler eventHandler) {

        // Please note that this config is the original configuration created from the user spec.
        // It does not contain any already applied overrides.
        var config = new UnmodifiableConfiguration(context.getConfiguration());

        // Gather original memory configuration from the user spec
        CommonProcessMemorySpec<TaskExecutorFlinkMemory> memSpecs;
        try {
            memSpecs = FLINK_MEMORY_UTILS.memoryProcessSpecFromConfig(config);
        } catch (IllegalConfigurationException e) {
            LOG.warn("Current memory configuration is not valid. Aborting memory tuning.");
            return EMPTY_CONFIG;
        }

        MemorySize specHeapSize = memSpecs.getFlinkMemory().getJvmHeapMemorySize();
        MemorySize specManagedSize = memSpecs.getFlinkMemory().getManaged();
        MemorySize specNetworkSize = memSpecs.getFlinkMemory().getNetwork();
        MemorySize specMetaspaceSize = memSpecs.getJvmMetaspaceSize();
        LOG.info(
                "Spec memory - heap: {}, managed: {}, network: {}, meta: {}",
                specHeapSize.toHumanReadableString(),
                specManagedSize.toHumanReadableString(),
                specNetworkSize.toHumanReadableString(),
                specMetaspaceSize.toHumanReadableString());

        MemorySize maxMemoryBySpec = context.getTaskManagerMemory().orElse(MemorySize.ZERO);
        if (maxMemoryBySpec.compareTo(MemorySize.ZERO) <= 0) {
            LOG.warn("Spec TaskManager memory size could not be determined.");
            return EMPTY_CONFIG;
        }

        MemoryBudget memBudget = new MemoryBudget(maxMemoryBySpec.getBytes());
        // Budget the original spec's memory settings which we do not modify
        memBudget.budget(memSpecs.getFlinkMemory().getFrameworkOffHeap().getBytes());
        memBudget.budget(memSpecs.getFlinkMemory().getTaskOffHeap().getBytes());
        memBudget.budget(memSpecs.getJvmOverheadSize().getBytes());

        var globalMetrics = evaluatedMetrics.getGlobalMetrics();
        // The order matters in case the memory usage is higher than the maximum available memory.
        // Managed memory comes last because it can grow arbitrary for RocksDB jobs.
        MemorySize newNetworkSize =
                adjustNetworkMemory(
                        jobTopology,
                        ResourceCheckUtils.computeNewParallelisms(
                                scalingSummaries, evaluatedMetrics.getVertexMetrics()),
                        config,
                        memBudget);
        // Assign memory to the METASPACE before the HEAP to ensure all needed memory is provided
        // to the METASPACE
        MemorySize newMetaspaceSize =
                determineNewSize(getUsage(METASPACE_MEMORY_USED, globalMetrics), config, memBudget);
        MemorySize newHeapSize =
                determineNewSize(getUsage(HEAP_MEMORY_USED, globalMetrics), config, memBudget);
        MemorySize newManagedSize =
                adjustManagedMemory(
                        getUsage(MANAGED_MEMORY_USED, globalMetrics),
                        specManagedSize,
                        config,
                        memBudget);
        // Rescale heap according to scaling decision after distributing all memory pools
        newHeapSize =
                MemoryScaling.applyMemoryScaling(
                        newHeapSize, memBudget, context, scalingSummaries, evaluatedMetrics);
        LOG.info(
                "Optimized memory sizes: heap: {} managed: {}, network: {}, meta: {}",
                newHeapSize.toHumanReadableString(),
                newManagedSize.toHumanReadableString(),
                newNetworkSize.toHumanReadableString(),
                newMetaspaceSize.toHumanReadableString());

        // Diff can be negative (memory shrinks) or positive (memory grows)
        final long heapDiffBytes = newHeapSize.getBytes() - specHeapSize.getBytes();
        final long managedDiffBytes = newManagedSize.getBytes() - specManagedSize.getBytes();
        final long networkDiffBytes = newNetworkSize.getBytes() - specNetworkSize.getBytes();
        final long flinkMemoryDiffBytes = heapDiffBytes + managedDiffBytes + networkDiffBytes;

        // Update total memory according to memory diffs
        final MemorySize totalMemory =
                new MemorySize(maxMemoryBySpec.getBytes() - memBudget.getRemaining());
        if (totalMemory.compareTo(MemorySize.ZERO) <= 0) {
            LOG.warn("Invalid total memory configuration: {}", totalMemory);
            return EMPTY_CONFIG;
        }

        // Prepare the tuning config for new configuration values
        var tuningConfig = new ConfigChanges();
        // Adjust the total container memory
        tuningConfig.addOverride(TaskManagerOptions.TOTAL_PROCESS_MEMORY, totalMemory);
        // We do not set the framework/task heap memory because those are automatically derived from
        // setting the other mandatory memory options for managed memory, network, metaspace and jvm
        // overhead. However, we do precise accounting for heap memory above. In contrast to other
        // memory pools, there are no fractional variants for heap memory. Setting the absolute heap
        // memory options could cause invalid configuration states when users adapt the total amount
        // of memory. We also need to take care to remove any user-provided overrides for those.
        tuningConfig.addRemoval(TaskManagerOptions.TOTAL_FLINK_MEMORY);
        tuningConfig.addRemoval(TaskManagerOptions.TASK_HEAP_MEMORY);
        // Set default to zero because we already account for heap via task heap.
        tuningConfig.addOverride(TaskManagerOptions.FRAMEWORK_HEAP_MEMORY, MemorySize.ZERO);

        MemorySize flinkMemorySize =
                new MemorySize(
                        memSpecs.getTotalFlinkMemorySize().getBytes() + flinkMemoryDiffBytes);

        // All memory options which can be configured via fractions need to be re-calculated.
        tuningConfig.addOverride(
                TaskManagerOptions.MANAGED_MEMORY_FRACTION,
                getFraction(newManagedSize, flinkMemorySize));
        tuningConfig.addRemoval(TaskManagerOptions.MANAGED_MEMORY_SIZE);

        tuningConfig.addOverride(TaskManagerOptions.NETWORK_MEMORY_MIN, newNetworkSize);
        tuningConfig.addOverride(TaskManagerOptions.NETWORK_MEMORY_MAX, newNetworkSize);

        tuningConfig.addOverride(
                TaskManagerOptions.JVM_OVERHEAD_FRACTION,
                getFraction(memSpecs.getJvmOverheadSize(), totalMemory));

        tuningConfig.addOverride(TaskManagerOptions.JVM_METASPACE, newMetaspaceSize);

        eventHandler.handleEvent(
                context,
                AutoScalerEventHandler.Type.Normal,
                "Configuration recommendation",
                String.format(
                        "Memory tuning recommends the following configuration (automatic tuning is %s):\n%s",
                        config.get(AutoScalerOptions.MEMORY_TUNING_ENABLED)
                                ? "enabled"
                                : "disabled",
                        formatConfig(tuningConfig)),
                "MemoryTuning",
                config.get(AutoScalerOptions.SCALING_EVENT_INTERVAL));

        if (!context.getConfiguration().get(AutoScalerOptions.MEMORY_TUNING_ENABLED)) {
            return EMPTY_CONFIG;
        }

        return tuningConfig;
    }