protected FlinkConfigBuilder applyJobOrSessionSpec()

in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java [298:348]


    protected FlinkConfigBuilder applyJobOrSessionSpec() throws URISyntaxException {
        KubernetesDeploymentMode deploymentMode = KubernetesDeploymentMode.getDeploymentMode(spec);

        if (spec.getJob() != null) {
            JobSpec jobSpec = spec.getJob();
            effectiveConfig.set(
                    DeploymentOptions.TARGET, KubernetesDeploymentTarget.APPLICATION.getName());

            if (jobSpec.getJarURI() != null) {
                final URI uri = new URI(jobSpec.getJarURI());
                effectiveConfig.set(
                        PipelineOptions.JARS, Collections.singletonList(uri.toString()));
            }

            effectiveConfig.set(CoreOptions.DEFAULT_PARALLELISM, getParallelism());

            if (jobSpec.getAllowNonRestoredState() != null) {
                effectiveConfig.set(
                        SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE,
                        jobSpec.getAllowNonRestoredState());
            }

            if (jobSpec.getEntryClass() != null) {
                effectiveConfig.set(
                        ApplicationConfiguration.APPLICATION_MAIN_CLASS, jobSpec.getEntryClass());
            }

            if (jobSpec.getArgs() != null) {
                effectiveConfig.set(
                        ApplicationConfiguration.APPLICATION_ARGS,
                        Arrays.asList(jobSpec.getArgs()));
            }
        } else {
            effectiveConfig.set(
                    DeploymentOptions.TARGET, KubernetesDeploymentTarget.SESSION.getName());
        }

        if (deploymentMode == KubernetesDeploymentMode.STANDALONE) {
            effectiveConfig.set(DeploymentOptions.TARGET, "remote");
            effectiveConfig.set(
                    StandaloneKubernetesConfigOptionsInternal.CLUSTER_MODE,
                    spec.getJob() == null ? SESSION : APPLICATION);

            if (spec.getJob() != null && spec.getJob().getJarURI() != null) {
                effectiveConfig.set(
                        PipelineOptions.CLASSPATHS,
                        Collections.singletonList(getStandaloneJarURI(spec.getJob())));
            }
        }
        return this;
    }