in mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/JobDefinitionResolver.java [71:207]
JobDefinition getResolvedJobDefinition(final String user, final JobDefinition givenJobDefnOp, final IJobClusterMetadata jobClusterMetadata) throws Exception {
Preconditions.checkNotNull(givenJobDefnOp, "JobDefinition cannot be null");
Preconditions.checkNotNull(jobClusterMetadata, "JobClusterMetadata cannot be null");
JobDefinition resolvedJobDefn = givenJobDefnOp;
logger.info("Given JobDefn {}", resolvedJobDefn);
// inherit params from cluster if not specified
List<Parameter> parameters = (resolvedJobDefn.getParameters() != null && !resolvedJobDefn.getParameters().isEmpty()) ? resolvedJobDefn.getParameters() : jobClusterMetadata.getJobClusterDefinition().getParameters();
// Inherit labels from cluster, if resolvedJobDefn has labels, override the existing ones
Map<String, Label> labelMap = jobClusterMetadata.getJobClusterDefinition().getLabels().stream()
.collect(Collectors.toMap(Label::getName, label -> label));
if (resolvedJobDefn.getLabels() != null && !resolvedJobDefn.getLabels().isEmpty()) {
resolvedJobDefn.getLabels()
.forEach(label -> labelMap.put(label.getName(), label));
}
List<Label> labels = Collections.unmodifiableList(new ArrayList<>(labelMap.values()));
String artifactName = resolvedJobDefn.getArtifactName();
String jobJarUrl = resolvedJobDefn.getJobJarUrl();
SchedulingInfo schedulingInfo = resolvedJobDefn.getSchedulingInfo();
String version = resolvedJobDefn.getVersion();
JobClusterConfig jobClusterConfig = null;
if(!isNull(artifactName) && !isNull(jobJarUrl) && !isNull(version) && !schedulingInfoNotValid(schedulingInfo)) {
// update cluster ?
} else if(!isNull(artifactName) && !isNull(jobJarUrl) && !isNull(version) && schedulingInfoNotValid(schedulingInfo)) { // scheduling Info is not given while new artifact is specified
// exception
String msg = String.format("Scheduling info is not specified during Job Submit for cluster %s while new artifact is specified %s. Job Submit fails", jobClusterMetadata.getJobClusterDefinition().getName(), artifactName);
logger.warn(msg);
throw new Exception(msg);
} else if(!isNull(artifactName) && !isNull(jobJarUrl)&& isNull(version) && !schedulingInfoNotValid(schedulingInfo)) { // artifact & schedulingInfo are given
// generate new version and update cluster
version = String.valueOf(System.currentTimeMillis());
// update cluster ?
} else if(!isNull(artifactName) && !isNull(jobJarUrl) && isNull(version) && schedulingInfoNotValid(schedulingInfo)) { // scheduling info not given while new artifact is specified
// exception
String msg = String.format("Scheduling info is not specified during Job Submit for cluster %s while new artifact %s is specified. Job Submit fails", jobClusterMetadata.getJobClusterDefinition().getName(), artifactName);
logger.warn(msg);
throw new Exception(msg);
} else if(isNull(artifactName) && isNull(jobJarUrl) && !isNull(version) && !schedulingInfoNotValid(schedulingInfo)) { // version is given & scheduling info is given
// fetch JobCluster config for version and validate the given schedulingInfo is compatible
Optional<JobClusterConfig> clusterConfigForVersion = getJobClusterConfigForVersion(jobClusterMetadata, version);
if(!clusterConfigForVersion.isPresent()) {
String msg = String.format("No Job Cluster config could be found for version %s in JobCluster %s. Job Submit fails", version, jobClusterMetadata.getJobClusterDefinition().getName());
logger.warn(msg);
throw new Exception(msg);
}
jobClusterConfig = clusterConfigForVersion.get();
if(!validateSchedulingInfo(schedulingInfo, jobClusterConfig.getSchedulingInfo(), jobClusterMetadata)) {
String msg = String.format("Given SchedulingInfo %s is incompatible with that associated with the given version %s in JobCluster %s. Job Submit fails", schedulingInfo, version, jobClusterMetadata.getJobClusterDefinition().getName());
logger.warn(msg);
throw new Exception(msg);
}
artifactName = jobClusterConfig.getArtifactName();
jobJarUrl = jobClusterConfig.getJobJarUrl();
} else if(isNull(artifactName) && isNull(jobJarUrl) && !isNull(version) && schedulingInfoNotValid(schedulingInfo)) { // Only version is given
// fetch JobCluster config for version
Optional<JobClusterConfig> clusterConfigForVersion = getJobClusterConfigForVersion(jobClusterMetadata, version);
if(!clusterConfigForVersion.isPresent()) {
String msg = String.format("No Job Cluster config could be found for version %s in JobCluster %s. Job Submit fails", version, jobClusterMetadata.getJobClusterDefinition().getName());
logger.warn(msg);
throw new Exception(msg);
}
jobClusterConfig = clusterConfigForVersion.get();
schedulingInfo = jobClusterConfig.getSchedulingInfo();
artifactName = jobClusterConfig.getArtifactName();
jobJarUrl = jobClusterConfig.getJobJarUrl();
} else if(isNull(artifactName) && isNull(jobJarUrl) && isNull(version) && !schedulingInfoNotValid(schedulingInfo)) { // only scheduling info is given
// fetch latest Job Cluster config
jobClusterConfig = jobClusterMetadata.getJobClusterDefinition().getJobClusterConfig();
version = jobClusterConfig.getVersion();
artifactName = jobClusterConfig.getArtifactName();
jobJarUrl = jobClusterConfig.getJobJarUrl();
// set version to it
// validate given scheduling info is compatible
if(!validateSchedulingInfo(schedulingInfo, jobClusterConfig.getSchedulingInfo(), jobClusterMetadata)) {
String msg = String.format("Given SchedulingInfo %s is incompatible with that associated with the given version %s in JobCluster %s which is %s. Job Submit fails", schedulingInfo, version, jobClusterMetadata.getJobClusterDefinition().getName(), jobClusterMetadata.getJobClusterDefinition().getJobClusterConfig().getSchedulingInfo());
logger.warn(msg);
throw new Exception(msg);
}
} else if(isNull(artifactName) && isNull(jobJarUrl) && isNull(version) && schedulingInfoNotValid(schedulingInfo)){ // Nothing is given. Use the latest on the cluster
// fetch latest job cluster config
jobClusterConfig = jobClusterMetadata.getJobClusterDefinition().getJobClusterConfig();
// set version to it
version = jobClusterConfig.getVersion();
// use scheduling info from that.
schedulingInfo = jobClusterConfig.getSchedulingInfo();
artifactName = jobClusterConfig.getArtifactName();
jobJarUrl = jobClusterConfig.getJobJarUrl();
} else {
// exception should never get here.
throw new Exception(String.format("Invalid case for resolveJobDefinition artifactName %s jobJarUrl %s version %s schedulingInfo %s", jobJarUrl, artifactName, version, schedulingInfo));
}
logger.info("Resolved version {}, schedulingInfo {}, artifactName {}, jobJarUrl {}", version, schedulingInfo, artifactName, jobJarUrl);
if(isNull(artifactName) || isNull(jobJarUrl) || isNull(version) || schedulingInfoNotValid(schedulingInfo)) {
String msg = String.format(" SchedulingInfo %s or artifact %s or jobJarUrl %s or version %s could not be resolved in JobCluster %s. Job Submit fails", schedulingInfo, artifactName, jobJarUrl, version, jobClusterMetadata.getJobClusterDefinition().getName());
logger.warn(msg);
throw new Exception(msg);
}
return new JobDefinition.Builder()
.from(resolvedJobDefn)
.withParameters(parameters)
.withLabels(labels)
.withSchedulingInfo(schedulingInfo)
.withUser(user)
.withVersion(version)
.withArtifactName(artifactName)
.withJobJarUrl(jobJarUrl)
.build();
}