private void validateStateSnapshot()

in flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkValidator.java [104:139]


    private void validateStateSnapshot(KubernetesResource resource) {
        FlinkStateSnapshot snapshot = objectMapper.convertValue(resource, FlinkStateSnapshot.class);

        var jobRef = snapshot.getSpec().getJobReference();

        AbstractFlinkResource<?, ?> targetResource = null;
        if (jobRef != null && jobRef.getName() != null && jobRef.getKind() != null) {
            var namespace =
                    FlinkStateSnapshotUtils.getSnapshotJobReferenceResourceId(snapshot)
                            .getNamespace()
                            .orElseThrow(
                                    () ->
                                            new IllegalArgumentException(
                                                    "Cannot determine namespace for snapshot"));
            var key = Cache.namespaceKeyFunc(namespace, jobRef.getName());

            if (JobKind.FLINK_DEPLOYMENT.equals(jobRef.getKind())) {
                targetResource =
                        informerManager.getFlinkDepInformer(namespace).getStore().getByKey(key);
            } else if (JobKind.FLINK_SESSION_JOB.equals(jobRef.getKind())) {
                targetResource =
                        informerManager
                                .getFlinkSessionJobInformer(namespace)
                                .getStore()
                                .getByKey(key);
            }
        }

        for (FlinkResourceValidator validator : validators) {
            Optional<String> validationError =
                    validator.validateStateSnapshot(snapshot, Optional.ofNullable(targetResource));
            if (validationError.isPresent()) {
                throw new NotAllowedException(validationError.get());
            }
        }
    }