in mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/route/v1/JobsRoute.java [620:731]
private Pair<Boolean, String> validateSubmitJobRequest(
MantisJobDefinition mjd,
Optional<String> clusterNameInResource) {
if (null == mjd) {
logger.error("rejecting job submit request, job definition is malformed {}", mjd);
return Pair.apply(false, "Malformed job definition.");
}
// must include job cluster name
if (mjd.getName() == null || mjd.getName().length() == 0) {
logger.info("rejecting job submit request, must include name {}", mjd);
return Pair.apply(false, "Job definition must include name");
}
// validate specified job cluster name matches with what specified in REST resource endpoint
if (clusterNameInResource.isPresent()) {
if (!clusterNameInResource.get().equals(mjd.getName())) {
String msg = String.format("Cluster name specified in request payload [%s] " +
"does not match with what specified in resource endpoint [%s]",
mjd.getName(), clusterNameInResource.get());
logger.info("rejecting job submit request, {} {}", msg, mjd);
return Pair.apply(false, msg);
}
}
// validate scheduling info
SchedulingInfo schedulingInfo = mjd.getSchedulingInfo();
if (schedulingInfo != null) {
Map<Integer, StageSchedulingInfo> stages = schedulingInfo.getStages();
if (stages != null) {
for (StageSchedulingInfo stageSchedInfo : stages.values()) {
double cpuCores = stageSchedInfo.getMachineDefinition().getCpuCores();
int maxCpuCores = ConfigurationProvider.getConfig()
.getWorkerMachineDefinitionMaxCpuCores();
if (cpuCores > maxCpuCores) {
logger.info(
"rejecting job submit request, requested CPU {} > max for {} (user: {}) (stage: {})",
cpuCores,
mjd.getName(),
mjd.getUser(),
stages);
return Pair.apply(
false,
"requested CPU cannot be more than max CPU per worker " +
maxCpuCores);
}
double memoryMB = stageSchedInfo.getMachineDefinition().getMemoryMB();
int maxMemoryMB = ConfigurationProvider.getConfig()
.getWorkerMachineDefinitionMaxMemoryMB();
if (memoryMB > maxMemoryMB) {
logger.info(
"rejecting job submit request, requested memory {} > max for {} (user: {}) (stage: {})",
memoryMB,
mjd.getName(),
mjd.getUser(),
stages);
return Pair.apply(
false,
"requested memory cannot be more than max memoryMB per worker " +
maxMemoryMB);
}
double networkMbps = stageSchedInfo.getMachineDefinition().getNetworkMbps();
int maxNetworkMbps = ConfigurationProvider.getConfig()
.getWorkerMachineDefinitionMaxNetworkMbps();
if (networkMbps > maxNetworkMbps) {
logger.info(
"rejecting job submit request, requested network {} > max for {} (user: {}) (stage: {})",
networkMbps,
mjd.getName(),
mjd.getUser(),
stages);
return Pair.apply(
false,
"requested network cannot be more than max networkMbps per worker " +
maxNetworkMbps);
}
int numberOfInstances = stageSchedInfo.getNumberOfInstances();
int maxWorkersPerStage = ConfigurationProvider.getConfig()
.getMaxWorkersPerStage();
if (numberOfInstances > maxWorkersPerStage) {
logger.info(
"rejecting job submit request, requested num instances {} > max for {} (user: {}) (stage: {})",
numberOfInstances,
mjd.getName(),
mjd.getUser(),
stages);
return Pair.apply(
false,
"requested number of instances per stage cannot be more than " +
maxWorkersPerStage);
}
StageScalingPolicy scalingPolicy = stageSchedInfo.getScalingPolicy();
if (scalingPolicy != null) {
if (scalingPolicy.getMax() > maxWorkersPerStage) {
logger.info(
"rejecting job submit request, requested num instances in scaling policy {} > max for {} (user: {}) (stage: {})",
numberOfInstances,
mjd.getName(),
mjd.getUser(),
stages);
return Pair.apply(
false,
"requested number of instances per stage in scaling policy cannot be more than " +
maxWorkersPerStage);
}
}
}
}
}
return Pair.apply(true, "");
}