private Optional validateJobSpec()

in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java [240:296]


    private Optional<String> validateJobSpec(
            JobSpec job, @Nullable TaskManagerSpec tm, Map<String, String> confMap) {
        if (job == null) {
            return Optional.empty();
        }

        Configuration configuration = Configuration.fromMap(confMap);

        if (job.getUpgradeMode() != UpgradeMode.STATELESS) {
            if (StringUtils.isNullOrWhitespaceOnly(
                    configuration.getString(CheckpointingOptions.CHECKPOINTS_DIRECTORY))) {
                return Optional.of(
                        String.format(
                                "Checkpoint directory[%s] must be defined for last-state and savepoint upgrade modes",
                                CheckpointingOptions.CHECKPOINTS_DIRECTORY.key()));
            }
        }

        if (StringUtils.isNullOrWhitespaceOnly(
                configuration.getString(CheckpointingOptions.SAVEPOINT_DIRECTORY))) {
            if (job.getUpgradeMode() == UpgradeMode.SAVEPOINT) {
                return Optional.of(
                        String.format(
                                "Job could not be upgraded with savepoint while config key[%s] is not set",
                                CheckpointingOptions.SAVEPOINT_DIRECTORY.key()));
            } else if (job.getSavepointTriggerNonce() != null) {
                return Optional.of(
                        String.format(
                                "Savepoint could not be manually triggered for the running job while config key[%s] is not set",
                                CheckpointingOptions.SAVEPOINT_DIRECTORY.key()));
            } else if (configuration.contains(
                    KubernetesOperatorConfigOptions.PERIODIC_SAVEPOINT_INTERVAL)) {
                return Optional.of(
                        String.format(
                                "Periodic savepoints cannot be enabled when config key[%s] is not set",
                                CheckpointingOptions.SAVEPOINT_DIRECTORY.key()));
            } else if (configuration.get(
                            KubernetesOperatorConfigOptions
                                    .OPERATOR_JOB_UPGRADE_LAST_STATE_CHECKPOINT_MAX_AGE)
                    != null) {
                return Optional.of(
                        String.format(
                                "In order to use max-checkpoint age functionality config key[%s] must be set to allow triggering savepoint upgrades.",
                                CheckpointingOptions.SAVEPOINT_DIRECTORY.key()));
            }
        }

        var tmReplicasDefined = tm != null && tm.getReplicas() != null;

        if (tmReplicasDefined && tm.getReplicas() < 1) {
            return Optional.of("TaskManager replicas must be larger than 0");
        } else if (!tmReplicasDefined && job.getParallelism() < 1) {
            return Optional.of("Job parallelism must be larger than 0");
        }

        return Optional.empty();
    }