in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/FlinkSessionJobObserver.java [75:128]
protected boolean checkIfAlreadyUpgraded(FlinkResourceContext<FlinkSessionJob> ctx) {
var flinkSessionJob = ctx.getResource();
var uid = flinkSessionJob.getMetadata().getUid();
Collection<JobStatusMessage> jobStatusMessages;
try {
jobStatusMessages = ctx.getFlinkService().listJobs(ctx.getObserveConfig());
} catch (Exception e) {
throw new RuntimeException("Failed to list jobs", e);
}
var matchedJobs = new ArrayList<JobID>();
for (JobStatusMessage jobStatusMessage : jobStatusMessages) {
var jobId = jobStatusMessage.getJobId();
if (jobId.getLowerPart()
== generateSessionJobFixedJobID(uid, jobId.getUpperPart() + 1L)
.getLowerPart()
&& !jobStatusMessage.getJobState().isGloballyTerminalState()) {
matchedJobs.add(jobId);
}
}
if (matchedJobs.isEmpty()) {
return false;
} else if (matchedJobs.size() > 1) {
// this indicates a bug, which means we have more than one running job for a single
// SessionJob CR.
throw new RuntimeException(
String.format(
"Unexpected case: %d job found for the resource with uid: %s",
matchedJobs.size(), flinkSessionJob.getMetadata().getUid()));
} else {
var matchedJobID = matchedJobs.get(0);
Long upgradeTargetGeneration =
ReconciliationUtils.getUpgradeTargetGeneration(flinkSessionJob);
long deployedGeneration = matchedJobID.getUpperPart();
var oldJobID = flinkSessionJob.getStatus().getJobStatus().getJobId();
if (upgradeTargetGeneration == deployedGeneration) {
LOG.info(
"Pending upgrade is already deployed, updating status. Old jobID:{}, new jobID:{}",
oldJobID,
matchedJobID.toHexString());
flinkSessionJob.getStatus().getJobStatus().setJobId(matchedJobID.toHexString());
return true;
} else {
var msg =
String.format(
"Running job %s's generation %s doesn't match upgrade target generation %s.",
matchedJobID.toHexString(),
deployedGeneration,
upgradeTargetGeneration);
throw new RuntimeException(msg);
}
}
}