in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java [483:516]
public Optional<String> validateStateSnapshot(
FlinkStateSnapshot snapshot, Optional<AbstractFlinkResource<?, ?>> target) {
var spec = snapshot.getSpec();
if ((!spec.isSavepoint() && !spec.isCheckpoint())
|| (spec.isSavepoint() && spec.isCheckpoint())) {
return Optional.of(
"Exactly one of checkpoint or savepoint configurations has to be set.");
}
if (spec.isSavepoint() && spec.getSavepoint().getAlreadyExists()) {
return Optional.empty();
}
// The remaining checks are not required if savepoint already exists.
if (spec.getJobReference() == null) {
return Optional.of("Job reference must be supplied for this snapshot");
}
// If the savepoint has already been processed by the operator, we don't need to check the
// job reference.
if (target.isEmpty()
&& (snapshot.getStatus() == null
|| FlinkStateSnapshotStatus.State.TRIGGER_PENDING.equals(
snapshot.getStatus().getState()))) {
var resourceId = FlinkStateSnapshotUtils.getSnapshotJobReferenceResourceId(snapshot);
return Optional.of(
String.format(
"Target for snapshot %s/%s was not found",
resourceId.getNamespace().orElse(null), resourceId.getName()));
}
return Optional.empty();
}