public static Optional shouldTriggerSnapshot()

in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtils.java [141:209]


    public static Optional<SnapshotTriggerType> shouldTriggerSnapshot(
            AbstractFlinkResource<?, ?> resource,
            Configuration conf,
            SnapshotType snapshotType,
            Instant lastTrigger) {

        var status = resource.getStatus();
        var jobStatus = status.getJobStatus();
        var jobSpec = resource.getSpec().getJob();

        if (!ReconciliationUtils.isJobRunning(status)) {
            return Optional.empty();
        }

        var reconciledJobSpec =
                status.getReconciliationStatus().deserializeLastReconciledSpec().getJob();

        // Values that are specific to the snapshot type
        Long triggerNonce;
        Long reconciledTriggerNonce;
        boolean inProgress;
        String automaticTriggerExpression;

        switch (snapshotType) {
            case SAVEPOINT:
                triggerNonce = jobSpec.getSavepointTriggerNonce();
                reconciledTriggerNonce = reconciledJobSpec.getSavepointTriggerNonce();
                inProgress = savepointInProgress(jobStatus);
                automaticTriggerExpression =
                        conf.get(KubernetesOperatorConfigOptions.PERIODIC_SAVEPOINT_INTERVAL);
                break;
            case CHECKPOINT:
                triggerNonce = jobSpec.getCheckpointTriggerNonce();
                reconciledTriggerNonce = reconciledJobSpec.getCheckpointTriggerNonce();
                inProgress = checkpointInProgress(jobStatus);
                automaticTriggerExpression =
                        conf.get(KubernetesOperatorConfigOptions.PERIODIC_CHECKPOINT_INTERVAL);
                break;
            default:
                throw new IllegalArgumentException("Unsupported snapshot type: " + snapshotType);
        }

        if (inProgress) {
            return Optional.empty();
        }

        var triggerNonceChanged =
                triggerNonce != null && !triggerNonce.equals(reconciledTriggerNonce);
        if (triggerNonceChanged) {
            if (snapshotType == CHECKPOINT && !isSnapshotTriggeringSupported(conf)) {
                LOG.warn(
                        "Manual checkpoint triggering is attempted, but is not supported (requires Flink 1.17+)");
                return Optional.empty();
            } else {
                return Optional.of(SnapshotTriggerType.MANUAL);
            }
        }

        if (shouldTriggerAutomaticSnapshot(snapshotType, automaticTriggerExpression, lastTrigger)) {
            if (snapshotType == CHECKPOINT && !isSnapshotTriggeringSupported(conf)) {
                LOG.warn(
                        "Automatic checkpoints triggering is configured but is not supported (requires Flink 1.17+)");
                return Optional.empty();
            } else {
                return Optional.of(SnapshotTriggerType.PERIODIC);
            }
        }
        return Optional.empty();
    }