protected static Optional shouldTriggerSnapshot()

in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtils.java [178:256]


    protected static Optional<SnapshotTriggerType> shouldTriggerSnapshot(
            AbstractFlinkResource<?, ?> resource, Configuration conf, SnapshotType snapshotType) {

        var status = resource.getStatus();
        var jobStatus = status.getJobStatus();
        var jobSpec = resource.getSpec().getJob();

        if (!ReconciliationUtils.isJobRunning(status)) {
            return Optional.empty();
        }

        var reconciledJobSpec =
                status.getReconciliationStatus().deserializeLastReconciledSpec().getJob();

        // Values that are specific to the snapshot type
        Long triggerNonce;
        Long reconciledTriggerNonce;
        boolean inProgress;
        SnapshotInfo snapshotInfo;
        Duration interval;

        switch (snapshotType) {
            case SAVEPOINT:
                triggerNonce = jobSpec.getSavepointTriggerNonce();
                reconciledTriggerNonce = reconciledJobSpec.getSavepointTriggerNonce();
                inProgress = savepointInProgress(jobStatus);
                snapshotInfo = jobStatus.getSavepointInfo();
                interval = conf.get(KubernetesOperatorConfigOptions.PERIODIC_SAVEPOINT_INTERVAL);
                break;
            case CHECKPOINT:
                triggerNonce = jobSpec.getCheckpointTriggerNonce();
                reconciledTriggerNonce = reconciledJobSpec.getCheckpointTriggerNonce();
                inProgress = checkpointInProgress(jobStatus);
                snapshotInfo = jobStatus.getCheckpointInfo();
                interval = conf.get(KubernetesOperatorConfigOptions.PERIODIC_CHECKPOINT_INTERVAL);
                if (!isCheckpointsTriggeringSupported(conf)) {
                    LOG.warn(
                            "Periodic checkpoints triggering every {} is configured, "
                                    + "but not supported (requires Flink 1.17+)",
                            interval);
                    return Optional.empty();
                }
                break;
            default:
                throw new IllegalArgumentException("Unsupported snapshot type: " + snapshotType);
        }

        if (inProgress) {
            return Optional.empty();
        }

        var triggerNonceChanged =
                triggerNonce != null && !triggerNonce.equals(reconciledTriggerNonce);
        if (triggerNonceChanged) {
            return Optional.of(SnapshotTriggerType.MANUAL);
        }

        if (interval.isZero()) {
            return Optional.empty();
        }

        var lastTriggerTs = snapshotInfo.getLastPeriodicTriggerTimestamp();

        // When the resource is first created/periodic snapshotting enabled we have to compare
        // against the creation timestamp for triggering the first periodic savepoint
        var lastTrigger =
                lastTriggerTs == 0
                        ? Instant.parse(resource.getMetadata().getCreationTimestamp())
                        : Instant.ofEpochMilli(lastTriggerTs);
        var now = Instant.now();
        if (lastTrigger.plus(interval).isBefore(Instant.now())) {
            LOG.info(
                    "Triggering new periodic {} after {}",
                    snapshotType.toString().toLowerCase(),
                    Duration.between(lastTrigger, now));
            return Optional.of(SnapshotTriggerType.PERIODIC);
        }
        return Optional.empty();
    }