in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java [140:189]
public void deploy(
FlinkResourceContext<FlinkDeployment> ctx,
FlinkDeploymentSpec spec,
Configuration deployConfig,
Optional<String> savepoint,
boolean requireHaMetadata)
throws Exception {
var relatedResource = ctx.getResource();
var status = relatedResource.getStatus();
var flinkService = ctx.getFlinkService();
ClusterHealthEvaluator.removeLastValidClusterHealthInfo(
relatedResource.getStatus().getClusterInfo());
if (savepoint.isPresent()) {
deployConfig.set(SavepointConfigOptions.SAVEPOINT_PATH, savepoint.get());
} else {
deployConfig.removeConfig(SavepointConfigOptions.SAVEPOINT_PATH);
}
setOwnerReference(relatedResource, deployConfig);
setRandomJobResultStorePath(deployConfig);
if (status.getJobManagerDeploymentStatus() != JobManagerDeploymentStatus.MISSING) {
if (!ReconciliationUtils.isJobInTerminalState(status)) {
LOG.error("Invalid status for deployment: {}", status);
throw new RuntimeException("This indicates a bug...");
}
LOG.info("Deleting deployment with terminated application before new deployment");
flinkService.deleteClusterDeployment(
relatedResource.getMetadata(), status, deployConfig, true);
flinkService.waitForClusterShutdown(deployConfig);
}
setJobIdIfNecessary(spec, relatedResource, deployConfig);
eventRecorder.triggerEvent(
relatedResource,
EventRecorder.Type.Normal,
EventRecorder.Reason.Submit,
EventRecorder.Component.JobManagerDeployment,
MSG_SUBMIT);
flinkService.submitApplicationCluster(spec.getJob(), deployConfig, requireHaMetadata);
status.getJobStatus().setState(org.apache.flink.api.common.JobStatus.RECONCILING.name());
status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
IngressUtils.updateIngressRules(
relatedResource.getMetadata(), spec, deployConfig, kubernetesClient);
}