in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java [93:158]
protected void observeJmDeployment(FlinkResourceContext<FlinkDeployment> ctx) {
var flinkApp = ctx.getResource();
FlinkDeploymentStatus deploymentStatus = flinkApp.getStatus();
JobManagerDeploymentStatus previousJmStatus =
deploymentStatus.getJobManagerDeploymentStatus();
if (isSuspendedJob(flinkApp)) {
logger.debug("Skipping observe step for suspended application deployments");
return;
}
logger.info(
"Observing JobManager deployment. Previous status: {}", previousJmStatus.name());
if (JobManagerDeploymentStatus.DEPLOYED_NOT_READY == previousJmStatus) {
logger.info("JobManager deployment is ready");
deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
return;
}
Optional<Deployment> deployment =
ctx.getJosdkContext().getSecondaryResource(Deployment.class);
if (deployment.isPresent()) {
DeploymentStatus status = deployment.get().getStatus();
DeploymentSpec spec = deployment.get().getSpec();
if (status != null
&& status.getAvailableReplicas() != null
&& spec.getReplicas().intValue() == status.getReplicas()
&& spec.getReplicas().intValue() == status.getAvailableReplicas()
&& ctx.getFlinkService().isJobManagerPortReady(ctx.getObserveConfig())) {
// typically it takes a few seconds for the REST server to be ready
logger.info(
"JobManager deployment port is ready, waiting for the Flink REST API...");
deploymentStatus.setJobManagerDeploymentStatus(
JobManagerDeploymentStatus.DEPLOYED_NOT_READY);
return;
}
try {
checkFailedCreate(status);
// checking the pod is expensive; only do it when the deployment isn't ready
checkContainerBackoff(ctx);
} catch (DeploymentFailedException dfe) {
// throw only when not already in error status to allow for spec update
deploymentStatus.getJobStatus().setState(JobStatus.RECONCILING.name());
if (!JobManagerDeploymentStatus.ERROR.equals(
deploymentStatus.getJobManagerDeploymentStatus())) {
throw dfe;
}
return;
}
logger.info("JobManager is being deployed");
deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
return;
}
deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
deploymentStatus.getJobStatus().setState(JobStatus.RECONCILING.name());
if (previousJmStatus != JobManagerDeploymentStatus.MISSING
&& previousJmStatus != JobManagerDeploymentStatus.ERROR) {
onMissingDeployment(ctx);
}
}