protected FlinkConfigBuilder applyFlinkConfiguration()

in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java [138:178]


    protected FlinkConfigBuilder applyFlinkConfiguration() {
        // Parse config from spec's flinkConfiguration
        if (spec.getFlinkConfiguration() != null && !spec.getFlinkConfiguration().isEmpty()) {
            spec.getFlinkConfiguration().forEach(effectiveConfig::setString);
        }

        // Adapt default rest service type from 1.15+
        setDefaultConf(
                REST_SERVICE_EXPOSED_TYPE, KubernetesConfigOptions.ServiceExposedType.ClusterIP);
        // Set 'web.cancel.enable' to false to avoid users accidentally cancelling jobs.
        setDefaultConf(CANCEL_ENABLE, false);

        if (spec.getJob() != null) {
            // Set 'pipeline.name' to resource name by default for application deployments.
            setDefaultConf(PipelineOptions.NAME, clusterId);

            // With last-state upgrade mode, set the default value of
            // 'execution.checkpointing.interval'
            // to 5 minutes when HA is enabled.
            if (spec.getJob().getUpgradeMode() == UpgradeMode.LAST_STATE) {
                setDefaultConf(
                        ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
                        DEFAULT_CHECKPOINTING_INTERVAL);
            }

            // We need to keep the application clusters around for proper operator behaviour
            if (spec.getFlinkVersion().isNewerVersionThan(FlinkVersion.v1_14)) {
                effectiveConfig.set(SHUTDOWN_ON_APPLICATION_FINISH, false);
            }
            if (HighAvailabilityMode.isHighAvailabilityModeActivated(effectiveConfig)) {
                setDefaultConf(SUBMIT_FAILED_JOB_ON_APPLICATION_ERROR, true);
            }

            setDefaultConf(
                    ExecutionCheckpointingOptions.EXTERNALIZED_CHECKPOINT,
                    CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        }

        effectiveConfig.set(FLINK_VERSION, spec.getFlinkVersion());
        return this;
    }