in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java [138:178]
protected FlinkConfigBuilder applyFlinkConfiguration() {
// Parse config from spec's flinkConfiguration
if (spec.getFlinkConfiguration() != null && !spec.getFlinkConfiguration().isEmpty()) {
spec.getFlinkConfiguration().forEach(effectiveConfig::setString);
}
// Adapt default rest service type from 1.15+
setDefaultConf(
REST_SERVICE_EXPOSED_TYPE, KubernetesConfigOptions.ServiceExposedType.ClusterIP);
// Set 'web.cancel.enable' to false to avoid users accidentally cancelling jobs.
setDefaultConf(CANCEL_ENABLE, false);
if (spec.getJob() != null) {
// Set 'pipeline.name' to resource name by default for application deployments.
setDefaultConf(PipelineOptions.NAME, clusterId);
// With last-state upgrade mode, set the default value of
// 'execution.checkpointing.interval'
// to 5 minutes when HA is enabled.
if (spec.getJob().getUpgradeMode() == UpgradeMode.LAST_STATE) {
setDefaultConf(
ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
DEFAULT_CHECKPOINTING_INTERVAL);
}
// We need to keep the application clusters around for proper operator behaviour
if (spec.getFlinkVersion().isNewerVersionThan(FlinkVersion.v1_14)) {
effectiveConfig.set(SHUTDOWN_ON_APPLICATION_FINISH, false);
}
if (HighAvailabilityMode.isHighAvailabilityModeActivated(effectiveConfig)) {
setDefaultConf(SUBMIT_FAILED_JOB_ON_APPLICATION_ERROR, true);
}
setDefaultConf(
ExecutionCheckpointingOptions.EXTERNALIZED_CHECKPOINT,
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
}
effectiveConfig.set(FLINK_VERSION, spec.getFlinkVersion());
return this;
}