private void handleCheckpoint()

in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/snapshot/StateSnapshotObserver.java [115:170]


    private void handleCheckpoint(
            FlinkStateSnapshotContext ctx,
            CheckpointFetchResult checkpointInfo,
            FlinkResourceContext<FlinkDeployment> ctxFlinkDeployment,
            String jobId) {
        var resource = ctx.getResource();
        var resourceName = resource.getMetadata().getName();

        if (checkpointInfo.isPending()) {
            LOG.debug(
                    "Checkpoint for {} with ID {} is pending",
                    resourceName,
                    resource.getStatus().getTriggerId());
            return;
        }

        if (checkpointInfo.getError() != null) {
            throw new ReconciliationException(checkpointInfo.getError());
        }

        LOG.debug("Checkpoint {} was successful, querying final checkpoint path...", resourceName);
        var checkpointStatsResult =
                ctxFlinkDeployment
                        .getFlinkService()
                        .fetchCheckpointStats(
                                jobId,
                                checkpointInfo.getCheckpointId(),
                                ctx.getReferencedJobObserveConfig());

        if (checkpointStatsResult.isPending()) {
            return;
        }

        String path = checkpointStatsResult.getPath();
        // At this point the checkpoint is already assumed to be complete, so we can mark the
        // snapshot complete with empty path and trigger an event.
        if (checkpointStatsResult.getError() != null) {
            path = "";
            var error =
                    String.format(
                            "Checkpoint %s was successful, but failed to fetch path. Flink webserver stores only a limited amount of checkpoints in its cache, try increasing '%s' config for this job.\n%s",
                            resourceName,
                            CHECKPOINTS_HISTORY_SIZE.key(),
                            checkpointStatsResult.getError());
            eventRecorder.triggerSnapshotEvent(
                    resource,
                    EventRecorder.Type.Warning,
                    EventRecorder.Reason.CheckpointError,
                    EventRecorder.Component.Snapshot,
                    error,
                    ctx.getKubernetesClient());
        }

        LOG.info("Checkpoint {} successful: {}", resourceName, path);
        FlinkStateSnapshotUtils.snapshotSuccessful(resource, path, false);
    }