in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java [134:189]
public void deploy(
FlinkResourceContext<FlinkDeployment> ctx,
FlinkDeploymentSpec spec,
Configuration deployConfig,
Optional<String> savepoint,
boolean requireHaMetadata)
throws Exception {
var relatedResource = ctx.getResource();
var status = relatedResource.getStatus();
var flinkService = ctx.getFlinkService();
ClusterHealthEvaluator.removeLastValidClusterHealthInfo(
relatedResource.getStatus().getClusterInfo());
if (savepoint.isPresent()) {
// Savepoint deployment
deployConfig.set(SavepointConfigOptions.SAVEPOINT_PATH, savepoint.get());
} else if (requireHaMetadata && flinkService.atLeastOneCheckpoint(deployConfig)) {
// Last state deployment, explicitly set a dummy savepoint path to avoid accidental
// incorrect state restore in case the HA metadata is deleted by the user
deployConfig.set(SavepointConfigOptions.SAVEPOINT_PATH, LAST_STATE_DUMMY_SP_PATH);
status.getJobStatus().setUpgradeSavepointPath(LAST_STATE_DUMMY_SP_PATH);
} else {
// Stateless deployment, remove any user configured savepoint path
deployConfig.removeConfig(SavepointConfigOptions.SAVEPOINT_PATH);
}
setOwnerReference(relatedResource, deployConfig);
setRandomJobResultStorePath(deployConfig);
if (status.getJobManagerDeploymentStatus() != JobManagerDeploymentStatus.MISSING) {
Preconditions.checkArgument(ReconciliationUtils.isJobInTerminalState(status));
LOG.info("Deleting cluster with terminated application before new deployment");
flinkService.deleteClusterDeployment(
relatedResource.getMetadata(), status, deployConfig, !requireHaMetadata);
statusRecorder.patchAndCacheStatus(relatedResource, ctx.getKubernetesClient());
}
setJobIdIfNecessary(
relatedResource, deployConfig, ctx.getKubernetesClient(), requireHaMetadata);
eventRecorder.triggerEvent(
relatedResource,
EventRecorder.Type.Normal,
EventRecorder.Reason.Submit,
EventRecorder.Component.JobManagerDeployment,
MSG_SUBMIT,
ctx.getKubernetesClient());
flinkService.submitApplicationCluster(spec.getJob(), deployConfig, requireHaMetadata);
status.getJobStatus().setState(org.apache.flink.api.common.JobStatus.RECONCILING);
status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
IngressUtils.updateIngressRules(
relatedResource.getMetadata(), spec, deployConfig, ctx.getKubernetesClient());
}