private void observeTriggeredCheckpoint()

in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserver.java [155:210]


    private void observeTriggeredCheckpoint(FlinkResourceContext<CR> ctx, String jobID) {
        var resource = (AbstractFlinkResource<?, ?>) ctx.getResource();

        CheckpointInfo checkpointInfo = resource.getStatus().getJobStatus().getCheckpointInfo();

        LOG.info("Observing checkpoint status.");
        var checkpointFetchResult =
                ctx.getFlinkService()
                        .fetchCheckpointInfo(
                                checkpointInfo.getTriggerId(), jobID, ctx.getObserveConfig());

        if (checkpointFetchResult.isPending()) {
            LOG.info("Checkpoint operation not finished yet...");
            return;
        }

        if (checkpointFetchResult.getError() != null) {
            var err = checkpointFetchResult.getError();
            Duration gracePeriod =
                    ctx.getObserveConfig()
                            .get(
                                    KubernetesOperatorConfigOptions
                                            .OPERATOR_CHECKPOINT_TRIGGER_GRACE_PERIOD);
            if (SnapshotUtils.gracePeriodEnded(gracePeriod, checkpointInfo)) {
                LOG.error(
                        "Checkpoint attempt failed after grace period. Won't be retried again: "
                                + err);
                ReconciliationUtils.updateLastReconciledSnapshotTriggerNonce(
                        checkpointInfo, (AbstractFlinkResource) resource, CHECKPOINT);
            } else {
                LOG.warn("Checkpoint failed within grace period, retrying: " + err);
            }
            eventRecorder.triggerEvent(
                    resource,
                    EventRecorder.Type.Warning,
                    EventRecorder.Reason.CheckpointError,
                    EventRecorder.Component.Operator,
                    checkpointInfo.formatErrorMessage(
                            resource.getSpec().getJob().getCheckpointTriggerNonce()));
            checkpointInfo.resetTrigger();
            return;
        }

        var checkpoint =
                new Checkpoint(
                        checkpointInfo.getTriggerTimestamp(),
                        checkpointInfo.getTriggerType(),
                        checkpointInfo.getFormatType(),
                        SnapshotTriggerType.MANUAL == checkpointInfo.getTriggerType()
                                ? resource.getSpec().getJob().getCheckpointTriggerNonce()
                                : null);

        ReconciliationUtils.updateLastReconciledSnapshotTriggerNonce(
                checkpointInfo, resource, CHECKPOINT);
        checkpointInfo.updateLastCheckpoint(checkpoint);
    }