public void deploy()

in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java [134:189]


    public void deploy(
            FlinkResourceContext<FlinkDeployment> ctx,
            FlinkDeploymentSpec spec,
            Configuration deployConfig,
            Optional<String> savepoint,
            boolean requireHaMetadata)
            throws Exception {

        var relatedResource = ctx.getResource();
        var status = relatedResource.getStatus();
        var flinkService = ctx.getFlinkService();

        ClusterHealthEvaluator.removeLastValidClusterHealthInfo(
                relatedResource.getStatus().getClusterInfo());

        if (savepoint.isPresent()) {
            // Savepoint deployment
            deployConfig.set(SavepointConfigOptions.SAVEPOINT_PATH, savepoint.get());
        } else if (requireHaMetadata && flinkService.atLeastOneCheckpoint(deployConfig)) {
            // Last state deployment, explicitly set a dummy savepoint path to avoid accidental
            // incorrect state restore in case the HA metadata is deleted by the user
            deployConfig.set(SavepointConfigOptions.SAVEPOINT_PATH, LAST_STATE_DUMMY_SP_PATH);
            status.getJobStatus().setUpgradeSavepointPath(LAST_STATE_DUMMY_SP_PATH);
        } else {
            // Stateless deployment, remove any user configured savepoint path
            deployConfig.removeConfig(SavepointConfigOptions.SAVEPOINT_PATH);
        }

        setOwnerReference(relatedResource, deployConfig);
        setRandomJobResultStorePath(deployConfig);

        if (status.getJobManagerDeploymentStatus() != JobManagerDeploymentStatus.MISSING) {
            Preconditions.checkArgument(ReconciliationUtils.isJobInTerminalState(status));
            LOG.info("Deleting cluster with terminated application before new deployment");
            flinkService.deleteClusterDeployment(
                    relatedResource.getMetadata(), status, deployConfig, !requireHaMetadata);
            statusRecorder.patchAndCacheStatus(relatedResource, ctx.getKubernetesClient());
        }

        setJobIdIfNecessary(
                relatedResource, deployConfig, ctx.getKubernetesClient(), requireHaMetadata);

        eventRecorder.triggerEvent(
                relatedResource,
                EventRecorder.Type.Normal,
                EventRecorder.Reason.Submit,
                EventRecorder.Component.JobManagerDeployment,
                MSG_SUBMIT,
                ctx.getKubernetesClient());
        flinkService.submitApplicationCluster(spec.getJob(), deployConfig, requireHaMetadata);
        status.getJobStatus().setState(org.apache.flink.api.common.JobStatus.RECONCILING);
        status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);

        IngressUtils.updateIngressRules(
                relatedResource.getMetadata(), spec, deployConfig, ctx.getKubernetesClient());
    }