protected boolean checkIfAlreadyUpgraded()

in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/FlinkSessionJobObserver.java [75:128]


    protected boolean checkIfAlreadyUpgraded(FlinkResourceContext<FlinkSessionJob> ctx) {
        var flinkSessionJob = ctx.getResource();
        var uid = flinkSessionJob.getMetadata().getUid();
        Collection<JobStatusMessage> jobStatusMessages;
        try {
            jobStatusMessages = ctx.getFlinkService().listJobs(ctx.getObserveConfig());
        } catch (Exception e) {
            throw new RuntimeException("Failed to list jobs", e);
        }
        var matchedJobs = new ArrayList<JobID>();
        for (JobStatusMessage jobStatusMessage : jobStatusMessages) {
            var jobId = jobStatusMessage.getJobId();
            if (jobId.getLowerPart()
                            == generateSessionJobFixedJobID(uid, jobId.getUpperPart() + 1L)
                                    .getLowerPart()
                    && !jobStatusMessage.getJobState().isGloballyTerminalState()) {
                matchedJobs.add(jobId);
            }
        }

        if (matchedJobs.isEmpty()) {
            return false;
        } else if (matchedJobs.size() > 1) {
            // this indicates a bug, which means we have more than one running job for a single
            // SessionJob CR.
            throw new RuntimeException(
                    String.format(
                            "Unexpected case: %d job found for the resource with uid: %s",
                            matchedJobs.size(), flinkSessionJob.getMetadata().getUid()));
        } else {
            var matchedJobID = matchedJobs.get(0);
            Long upgradeTargetGeneration =
                    ReconciliationUtils.getUpgradeTargetGeneration(flinkSessionJob);
            long deployedGeneration = matchedJobID.getUpperPart();
            var oldJobID = flinkSessionJob.getStatus().getJobStatus().getJobId();

            if (upgradeTargetGeneration == deployedGeneration) {
                LOG.info(
                        "Pending upgrade is already deployed, updating status. Old jobID:{}, new jobID:{}",
                        oldJobID,
                        matchedJobID.toHexString());
                flinkSessionJob.getStatus().getJobStatus().setJobId(matchedJobID.toHexString());
                return true;
            } else {
                var msg =
                        String.format(
                                "Running job %s's generation %s doesn't match upgrade target generation %s.",
                                matchedJobID.toHexString(),
                                deployedGeneration,
                                upgradeTargetGeneration);
                throw new RuntimeException(msg);
            }
        }
    }