in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java [298:348]
protected FlinkConfigBuilder applyJobOrSessionSpec() throws URISyntaxException {
KubernetesDeploymentMode deploymentMode = KubernetesDeploymentMode.getDeploymentMode(spec);
if (spec.getJob() != null) {
JobSpec jobSpec = spec.getJob();
effectiveConfig.set(
DeploymentOptions.TARGET, KubernetesDeploymentTarget.APPLICATION.getName());
if (jobSpec.getJarURI() != null) {
final URI uri = new URI(jobSpec.getJarURI());
effectiveConfig.set(
PipelineOptions.JARS, Collections.singletonList(uri.toString()));
}
effectiveConfig.set(CoreOptions.DEFAULT_PARALLELISM, getParallelism());
if (jobSpec.getAllowNonRestoredState() != null) {
effectiveConfig.set(
SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE,
jobSpec.getAllowNonRestoredState());
}
if (jobSpec.getEntryClass() != null) {
effectiveConfig.set(
ApplicationConfiguration.APPLICATION_MAIN_CLASS, jobSpec.getEntryClass());
}
if (jobSpec.getArgs() != null) {
effectiveConfig.set(
ApplicationConfiguration.APPLICATION_ARGS,
Arrays.asList(jobSpec.getArgs()));
}
} else {
effectiveConfig.set(
DeploymentOptions.TARGET, KubernetesDeploymentTarget.SESSION.getName());
}
if (deploymentMode == KubernetesDeploymentMode.STANDALONE) {
effectiveConfig.set(DeploymentOptions.TARGET, "remote");
effectiveConfig.set(
StandaloneKubernetesConfigOptionsInternal.CLUSTER_MODE,
spec.getJob() == null ? SESSION : APPLICATION);
if (spec.getJob() != null && spec.getJob().getJarURI() != null) {
effectiveConfig.set(
PipelineOptions.CLASSPATHS,
Collections.singletonList(getStandaloneJarURI(spec.getJob())));
}
}
return this;
}