private boolean triggerSnapshotIfNeeded()

in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java [468:559]


    private boolean triggerSnapshotIfNeeded(FlinkResourceContext<CR> ctx, SnapshotType snapshotType)
            throws Exception {
        var resource = ctx.getResource();
        var conf = ctx.getObserveConfig();

        var lastTrigger =
                snapshotTriggerTimestampStore.getLastPeriodicTriggerInstant(
                        resource,
                        snapshotType,
                        FlinkStateSnapshotUtils.getFlinkStateSnapshotsSupplier(ctx));

        var triggerOpt =
                SnapshotUtils.shouldTriggerSnapshot(resource, conf, snapshotType, lastTrigger);
        if (triggerOpt.isEmpty()) {
            return false;
        }
        var triggerType = triggerOpt.get();

        if (SnapshotTriggerType.PERIODIC.equals(triggerType)) {
            snapshotTriggerTimestampStore.updateLastPeriodicTriggerTimestamp(
                    resource, snapshotType, Instant.now());
        }

        var createSnapshotResource =
                FlinkStateSnapshotUtils.isSnapshotResourceEnabled(ctx.getOperatorConfig(), conf);

        String jobId = resource.getStatus().getJobStatus().getJobId();
        switch (snapshotType) {
            case SAVEPOINT:
                var savepointFormatType =
                        conf.get(KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_FORMAT_TYPE);
                var savepointDirectory =
                        Preconditions.checkNotNull(
                                conf.get(CheckpointingOptions.SAVEPOINT_DIRECTORY));

                if (createSnapshotResource) {
                    FlinkStateSnapshotUtils.createSavepointResource(
                            ctx.getKubernetesClient(),
                            resource,
                            savepointDirectory,
                            triggerType,
                            SavepointFormatType.valueOf(savepointFormatType.name()),
                            conf.getBoolean(
                                    KubernetesOperatorConfigOptions
                                            .OPERATOR_JOB_SAVEPOINT_DISPOSE_ON_DELETE));

                    ReconciliationUtils.updateLastReconciledSnapshotTriggerNonce(
                            triggerType, resource, SAVEPOINT);
                } else {
                    var triggerId =
                            ctx.getFlinkService()
                                    .triggerSavepoint(
                                            jobId, savepointFormatType, savepointDirectory, conf);
                    resource.getStatus()
                            .getJobStatus()
                            .getSavepointInfo()
                            .setTrigger(
                                    triggerId,
                                    triggerType,
                                    SavepointFormatType.valueOf(savepointFormatType.name()));
                }

                break;
            case CHECKPOINT:
                if (createSnapshotResource) {
                    FlinkStateSnapshotUtils.createCheckpointResource(
                            ctx.getKubernetesClient(), resource, triggerType);

                    ReconciliationUtils.updateLastReconciledSnapshotTriggerNonce(
                            triggerType, resource, CHECKPOINT);
                } else {
                    var checkpointType =
                            conf.get(KubernetesOperatorConfigOptions.OPERATOR_CHECKPOINT_TYPE);
                    var triggerId =
                            ctx.getFlinkService()
                                    .triggerCheckpoint(
                                            jobId,
                                            org.apache.flink.core.execution.CheckpointType.valueOf(
                                                    checkpointType.name()),
                                            conf);
                    resource.getStatus()
                            .getJobStatus()
                            .getCheckpointInfo()
                            .setTrigger(triggerId, triggerType, checkpointType);
                }

                break;
            default:
                throw new IllegalArgumentException("Unsupported snapshot type: " + snapshotType);
        }
        return true;
    }