in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java [240:296]
private Optional<String> validateJobSpec(
JobSpec job, @Nullable TaskManagerSpec tm, Map<String, String> confMap) {
if (job == null) {
return Optional.empty();
}
Configuration configuration = Configuration.fromMap(confMap);
if (job.getUpgradeMode() != UpgradeMode.STATELESS) {
if (StringUtils.isNullOrWhitespaceOnly(
configuration.getString(CheckpointingOptions.CHECKPOINTS_DIRECTORY))) {
return Optional.of(
String.format(
"Checkpoint directory[%s] must be defined for last-state and savepoint upgrade modes",
CheckpointingOptions.CHECKPOINTS_DIRECTORY.key()));
}
}
if (StringUtils.isNullOrWhitespaceOnly(
configuration.getString(CheckpointingOptions.SAVEPOINT_DIRECTORY))) {
if (job.getUpgradeMode() == UpgradeMode.SAVEPOINT) {
return Optional.of(
String.format(
"Job could not be upgraded with savepoint while config key[%s] is not set",
CheckpointingOptions.SAVEPOINT_DIRECTORY.key()));
} else if (job.getSavepointTriggerNonce() != null) {
return Optional.of(
String.format(
"Savepoint could not be manually triggered for the running job while config key[%s] is not set",
CheckpointingOptions.SAVEPOINT_DIRECTORY.key()));
} else if (configuration.contains(
KubernetesOperatorConfigOptions.PERIODIC_SAVEPOINT_INTERVAL)) {
return Optional.of(
String.format(
"Periodic savepoints cannot be enabled when config key[%s] is not set",
CheckpointingOptions.SAVEPOINT_DIRECTORY.key()));
} else if (configuration.get(
KubernetesOperatorConfigOptions
.OPERATOR_JOB_UPGRADE_LAST_STATE_CHECKPOINT_MAX_AGE)
!= null) {
return Optional.of(
String.format(
"In order to use max-checkpoint age functionality config key[%s] must be set to allow triggering savepoint upgrades.",
CheckpointingOptions.SAVEPOINT_DIRECTORY.key()));
}
}
var tmReplicasDefined = tm != null && tm.getReplicas() != null;
if (tmReplicasDefined && tm.getReplicas() < 1) {
return Optional.of("TaskManager replicas must be larger than 0");
} else if (!tmReplicasDefined && job.getParallelism() < 1) {
return Optional.of("Job parallelism must be larger than 0");
}
return Optional.empty();
}