in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/snapshot/StateSnapshotReconciler.java [183:225]
private Optional<String> triggerCheckpointOrSavepoint(
FlinkStateSnapshotSpec spec, FlinkStateSnapshotContext ctx, String jobId)
throws Exception {
var flinkDeploymentContext =
ctxFactory.getResourceContext(
ctx.getReferencedJobFlinkDeployment(), ctx.getJosdkContext());
var flinkService = flinkDeploymentContext.getFlinkService();
var conf =
Preconditions.checkNotNull(
flinkDeploymentContext.getObserveConfig(),
String.format(
"Observe config was null for %s",
flinkDeploymentContext.getResource().getMetadata().getName()));
if (spec.isSavepoint()) {
var path =
ObjectUtils.firstNonNull(
spec.getSavepoint().getPath(),
conf.get(CheckpointingOptions.SAVEPOINT_DIRECTORY));
if (path == null) {
throw new IllegalArgumentException(
String.format(
"Either the savepoint path in the spec or configuration %s in the Flink resource has to be supplied",
CheckpointingOptions.SAVEPOINT_DIRECTORY.key()));
}
return Optional.of(
flinkService.triggerSavepoint(
jobId,
org.apache.flink.core.execution.SavepointFormatType.valueOf(
spec.getSavepoint().getFormatType().name()),
path,
conf));
} else if (spec.isCheckpoint()) {
if (!SnapshotUtils.isSnapshotTriggeringSupported(conf)) {
throw new IllegalArgumentException(
"Manual checkpoint triggering is not supported for this Flink job (requires Flink 1.17+)");
}
return Optional.of(flinkService.triggerCheckpoint(jobId, CheckpointType.FULL, conf));
} else {
throw new IllegalArgumentException(
"Snapshot must specify either savepoint or checkpoint spec");
}
}