private void observeTriggeredSavepoint()

in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserver.java [113:174]


    private void observeTriggeredSavepoint(FlinkResourceContext<CR> ctx, String jobID) {
        var resource = (AbstractFlinkResource<?, ?>) ctx.getResource();

        var savepointInfo = resource.getStatus().getJobStatus().getSavepointInfo();

        LOG.debug("Observing in-progress savepoint");
        var savepointFetchResult =
                ctx.getFlinkService()
                        .fetchSavepointInfo(
                                savepointInfo.getTriggerId(), jobID, ctx.getObserveConfig());

        if (savepointFetchResult.isPending()) {
            LOG.info("Savepoint operation not finished yet...");
            return;
        }

        if (savepointFetchResult.getError() != null) {
            var err = savepointFetchResult.getError();
            Duration gracePeriod =
                    ctx.getObserveConfig()
                            .get(
                                    KubernetesOperatorConfigOptions
                                            .OPERATOR_SAVEPOINT_TRIGGER_GRACE_PERIOD);
            if (SnapshotUtils.gracePeriodEnded(gracePeriod, savepointInfo)) {
                LOG.error(
                        "Savepoint attempt failed after grace period. Won't be retried again: "
                                + err);
                ReconciliationUtils.updateLastReconciledSnapshotTriggerNonce(
                        savepointInfo.getTriggerType(),
                        (AbstractFlinkResource) resource,
                        SAVEPOINT);
            } else {
                LOG.warn("Savepoint failed within grace period, retrying: " + err);
            }
            eventRecorder.triggerEvent(
                    resource,
                    EventRecorder.Type.Warning,
                    EventRecorder.Reason.SavepointError,
                    EventRecorder.Component.Operator,
                    savepointInfo.formatErrorMessage(
                            resource.getSpec().getJob().getSavepointTriggerNonce()),
                    ctx.getKubernetesClient());
            savepointInfo.resetTrigger();
            return;
        }

        var savepoint =
                new Savepoint(
                        savepointInfo.getTriggerTimestamp(),
                        savepointFetchResult.getLocation(),
                        savepointInfo.getTriggerType(),
                        savepointInfo.getFormatType(),
                        SnapshotTriggerType.MANUAL == savepointInfo.getTriggerType()
                                ? resource.getSpec().getJob().getSavepointTriggerNonce()
                                : null);

        ReconciliationUtils.updateLastReconciledSnapshotTriggerNonce(
                savepointInfo.getTriggerType(), resource, SAVEPOINT);

        // In case of periodic and manual savepoint, we still use lastSavepoint
        savepointInfo.updateLastSavepoint(savepoint);
    }