in flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkValidator.java [104:139]
private void validateStateSnapshot(KubernetesResource resource) {
FlinkStateSnapshot snapshot = objectMapper.convertValue(resource, FlinkStateSnapshot.class);
var jobRef = snapshot.getSpec().getJobReference();
AbstractFlinkResource<?, ?> targetResource = null;
if (jobRef != null && jobRef.getName() != null && jobRef.getKind() != null) {
var namespace =
FlinkStateSnapshotUtils.getSnapshotJobReferenceResourceId(snapshot)
.getNamespace()
.orElseThrow(
() ->
new IllegalArgumentException(
"Cannot determine namespace for snapshot"));
var key = Cache.namespaceKeyFunc(namespace, jobRef.getName());
if (JobKind.FLINK_DEPLOYMENT.equals(jobRef.getKind())) {
targetResource =
informerManager.getFlinkDepInformer(namespace).getStore().getByKey(key);
} else if (JobKind.FLINK_SESSION_JOB.equals(jobRef.getKind())) {
targetResource =
informerManager
.getFlinkSessionJobInformer(namespace)
.getStore()
.getByKey(key);
}
}
for (FlinkResourceValidator validator : validators) {
Optional<String> validationError =
validator.validateStateSnapshot(snapshot, Optional.ofNullable(targetResource));
if (validationError.isPresent()) {
throw new NotAllowedException(validationError.get());
}
}
}