in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java [296:335]
protected FlinkConfigBuilder applyJobOrSessionSpec() throws URISyntaxException {
KubernetesDeploymentMode deploymentMode = KubernetesDeploymentMode.getDeploymentMode(spec);
if (spec.getJob() != null) {
JobSpec jobSpec = spec.getJob();
effectiveConfig.set(
DeploymentOptions.TARGET, KubernetesDeploymentTarget.APPLICATION.getName());
if (deploymentMode == KubernetesDeploymentMode.NATIVE && jobSpec.getJarURI() != null) {
effectiveConfig.set(
PipelineOptions.JARS,
Collections.singletonList(new URI(jobSpec.getJarURI()).toString()));
}
effectiveConfig.set(CoreOptions.DEFAULT_PARALLELISM, getParallelism());
// We need to keep the application clusters around for proper operator behaviour
effectiveConfig.set(SHUTDOWN_ON_APPLICATION_FINISH, false);
setDefaultConf(SUBMIT_FAILED_JOB_ON_APPLICATION_ERROR, true);
// Generic shared job config logic
applyJobConfig(clusterId, effectiveConfig, jobSpec);
} else {
effectiveConfig.set(
DeploymentOptions.TARGET, KubernetesDeploymentTarget.SESSION.getName());
}
if (deploymentMode == KubernetesDeploymentMode.STANDALONE) {
effectiveConfig.set(DeploymentOptions.TARGET, "remote");
effectiveConfig.set(
StandaloneKubernetesConfigOptionsInternal.CLUSTER_MODE,
spec.getJob() == null ? SESSION : APPLICATION);
if (spec.getJob() != null && spec.getJob().getJarURI() != null) {
effectiveConfig.set(
PipelineOptions.CLASSPATHS,
Collections.singletonList(getStandaloneJarURI(spec.getJob())));
}
}
return this;
}