private Optional triggerCheckpointOrSavepoint()

in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/snapshot/StateSnapshotReconciler.java [183:225]


    private Optional<String> triggerCheckpointOrSavepoint(
            FlinkStateSnapshotSpec spec, FlinkStateSnapshotContext ctx, String jobId)
            throws Exception {
        var flinkDeploymentContext =
                ctxFactory.getResourceContext(
                        ctx.getReferencedJobFlinkDeployment(), ctx.getJosdkContext());
        var flinkService = flinkDeploymentContext.getFlinkService();
        var conf =
                Preconditions.checkNotNull(
                        flinkDeploymentContext.getObserveConfig(),
                        String.format(
                                "Observe config was null for %s",
                                flinkDeploymentContext.getResource().getMetadata().getName()));

        if (spec.isSavepoint()) {
            var path =
                    ObjectUtils.firstNonNull(
                            spec.getSavepoint().getPath(),
                            conf.get(CheckpointingOptions.SAVEPOINT_DIRECTORY));
            if (path == null) {
                throw new IllegalArgumentException(
                        String.format(
                                "Either the savepoint path in the spec or configuration %s in the Flink resource has to be supplied",
                                CheckpointingOptions.SAVEPOINT_DIRECTORY.key()));
            }
            return Optional.of(
                    flinkService.triggerSavepoint(
                            jobId,
                            org.apache.flink.core.execution.SavepointFormatType.valueOf(
                                    spec.getSavepoint().getFormatType().name()),
                            path,
                            conf));
        } else if (spec.isCheckpoint()) {
            if (!SnapshotUtils.isSnapshotTriggeringSupported(conf)) {
                throw new IllegalArgumentException(
                        "Manual checkpoint triggering is not supported for this Flink job (requires Flink 1.17+)");
            }
            return Optional.of(flinkService.triggerCheckpoint(jobId, CheckpointType.FULL, conf));
        } else {
            throw new IllegalArgumentException(
                    "Snapshot must specify either savepoint or checkpoint spec");
        }
    }