public CheckpointFetchResult fetchCheckpointInfo()

in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java [655:705]


    public CheckpointFetchResult fetchCheckpointInfo(
            String triggerId, String jobId, Configuration conf) {
        LOG.info("Fetching checkpoint result with triggerId: " + triggerId);
        try (RestClusterClient<String> clusterClient = getClusterClient(conf)) {
            CheckpointStatusHeaders checkpointStatusHeaders = CheckpointStatusHeaders.getInstance();
            CheckpointStatusMessageParameters checkpointStatusMessageParameters =
                    checkpointStatusHeaders.getUnresolvedMessageParameters();
            checkpointStatusMessageParameters.jobIdPathParameter.resolve(
                    JobID.fromHexString(jobId));
            checkpointStatusMessageParameters.triggerIdPathParameter.resolve(
                    TriggerId.fromHexString(triggerId));
            CompletableFuture<AsynchronousOperationResult<CheckpointInfo>> response =
                    clusterClient.sendRequest(
                            checkpointStatusHeaders,
                            checkpointStatusMessageParameters,
                            EmptyRequestBody.getInstance());

            if (response.get() == null || response.get().resource() == null) {
                return CheckpointFetchResult.pending();
            }

            if (response.get().resource().getFailureCause() != null) {
                LOG.error(
                        "Failure occurred while fetching the checkpoint result",
                        response.get().resource().getFailureCause());
                return CheckpointFetchResult.error(
                        response.get().resource().getFailureCause().toString());
            }

            QueueStatus.Id operationStatus = response.get().queueStatus().getId();
            switch (operationStatus) {
                case IN_PROGRESS:
                    return CheckpointFetchResult.pending();
                case COMPLETED:
                    LOG.info(
                            "Checkpoint {} triggered by the operator for job {} completed:",
                            triggerId,
                            jobId);
                    return CheckpointFetchResult.completed(
                            response.get().resource().getCheckpointId());
                default:
                    throw new IllegalStateException(
                            String.format(
                                    "Checkpoint %s for job %s is reported to be in an unknown status: %s",
                                    triggerId, jobId, operationStatus.name()));
            }
        } catch (Exception e) {
            LOG.error("Exception while fetching the checkpoint result", e);
            return CheckpointFetchResult.error(e.getMessage());
        }
    }