JobDefinition getResolvedJobDefinition()

in mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/JobDefinitionResolver.java [71:207]


    JobDefinition getResolvedJobDefinition(final String user, final JobDefinition givenJobDefnOp, final IJobClusterMetadata jobClusterMetadata) throws Exception {
        Preconditions.checkNotNull(givenJobDefnOp, "JobDefinition cannot be null");
        Preconditions.checkNotNull(jobClusterMetadata, "JobClusterMetadata cannot be null");

        JobDefinition resolvedJobDefn = givenJobDefnOp;

        logger.info("Given JobDefn {}", resolvedJobDefn);

        // inherit params from cluster if not specified
        List<Parameter> parameters = (resolvedJobDefn.getParameters() != null && !resolvedJobDefn.getParameters().isEmpty()) ? resolvedJobDefn.getParameters() : jobClusterMetadata.getJobClusterDefinition().getParameters();

        // Inherit labels from cluster, if resolvedJobDefn has labels, override the existing ones
        Map<String, Label> labelMap = jobClusterMetadata.getJobClusterDefinition().getLabels().stream()
            .collect(Collectors.toMap(Label::getName, label -> label));
        if (resolvedJobDefn.getLabels() != null && !resolvedJobDefn.getLabels().isEmpty()) {
            resolvedJobDefn.getLabels()
                .forEach(label -> labelMap.put(label.getName(), label));
        }
        List<Label> labels = Collections.unmodifiableList(new ArrayList<>(labelMap.values()));

        String artifactName = resolvedJobDefn.getArtifactName();
        String jobJarUrl = resolvedJobDefn.getJobJarUrl();
        SchedulingInfo schedulingInfo = resolvedJobDefn.getSchedulingInfo();
        String version = resolvedJobDefn.getVersion();
        JobClusterConfig jobClusterConfig = null;

        if(!isNull(artifactName) && !isNull(jobJarUrl) && !isNull(version) && !schedulingInfoNotValid(schedulingInfo)) {
            // update cluster ?

        } else if(!isNull(artifactName) && !isNull(jobJarUrl) && !isNull(version) && schedulingInfoNotValid(schedulingInfo)) { // scheduling Info is not given while new artifact is specified

            // exception
            String msg = String.format("Scheduling info is not specified during Job Submit for cluster %s while new artifact is specified %s. Job Submit fails", jobClusterMetadata.getJobClusterDefinition().getName(), artifactName);
            logger.warn(msg);
            throw new Exception(msg);

        } else if(!isNull(artifactName) && !isNull(jobJarUrl)&& isNull(version) && !schedulingInfoNotValid(schedulingInfo)) { // artifact & schedulingInfo are given

            // generate new version and update cluster
            version = String.valueOf(System.currentTimeMillis());
            // update cluster ?

        } else if(!isNull(artifactName) && !isNull(jobJarUrl) && isNull(version) && schedulingInfoNotValid(schedulingInfo)) { // scheduling info not given while new artifact is specified

            // exception
            String msg = String.format("Scheduling info is not specified during Job Submit for cluster %s while new artifact %s is specified. Job Submit fails", jobClusterMetadata.getJobClusterDefinition().getName(), artifactName);
            logger.warn(msg);
            throw new Exception(msg);

        } else if(isNull(artifactName) && isNull(jobJarUrl) && !isNull(version) && !schedulingInfoNotValid(schedulingInfo)) { // version is given & scheduling info is given

            // fetch JobCluster config for version and validate the given schedulingInfo is compatible
            Optional<JobClusterConfig> clusterConfigForVersion = getJobClusterConfigForVersion(jobClusterMetadata, version);
            if(!clusterConfigForVersion.isPresent()) {
                String msg = String.format("No Job Cluster config could be found for version %s in JobCluster %s. Job Submit fails", version, jobClusterMetadata.getJobClusterDefinition().getName());
                logger.warn(msg);
                throw new Exception(msg);
            }

            jobClusterConfig = clusterConfigForVersion.get();
            if(!validateSchedulingInfo(schedulingInfo, jobClusterConfig.getSchedulingInfo(), jobClusterMetadata)) {

                String msg = String.format("Given SchedulingInfo %s is incompatible with that associated with the given version %s in JobCluster %s. Job Submit fails", schedulingInfo, version, jobClusterMetadata.getJobClusterDefinition().getName());
                logger.warn(msg);
                throw new Exception(msg);
            }

            artifactName = jobClusterConfig.getArtifactName();
            jobJarUrl = jobClusterConfig.getJobJarUrl();

        } else if(isNull(artifactName) && isNull(jobJarUrl) && !isNull(version) && schedulingInfoNotValid(schedulingInfo)) { // Only version is given

            // fetch JobCluster config for version
            Optional<JobClusterConfig> clusterConfigForVersion = getJobClusterConfigForVersion(jobClusterMetadata, version);
            if(!clusterConfigForVersion.isPresent()) {
                String msg = String.format("No Job Cluster config could be found for version %s in JobCluster %s. Job Submit fails", version, jobClusterMetadata.getJobClusterDefinition().getName());
                logger.warn(msg);
                throw new Exception(msg);
            }

            jobClusterConfig = clusterConfigForVersion.get();
            schedulingInfo = jobClusterConfig.getSchedulingInfo();
            artifactName = jobClusterConfig.getArtifactName();
            jobJarUrl = jobClusterConfig.getJobJarUrl();


        } else if(isNull(artifactName) && isNull(jobJarUrl) && isNull(version) && !schedulingInfoNotValid(schedulingInfo)) { // only scheduling info is given

            // fetch latest Job Cluster config
            jobClusterConfig = jobClusterMetadata.getJobClusterDefinition().getJobClusterConfig();
            version = jobClusterConfig.getVersion();
            artifactName = jobClusterConfig.getArtifactName();
            jobJarUrl = jobClusterConfig.getJobJarUrl();
            // set version to it
            // validate given scheduling info is compatible
            if(!validateSchedulingInfo(schedulingInfo, jobClusterConfig.getSchedulingInfo(), jobClusterMetadata)) {
                String msg = String.format("Given SchedulingInfo %s is incompatible with that associated with the given version %s in JobCluster %s which is %s. Job Submit fails", schedulingInfo, version, jobClusterMetadata.getJobClusterDefinition().getName(), jobClusterMetadata.getJobClusterDefinition().getJobClusterConfig().getSchedulingInfo());
                logger.warn(msg);
                throw new Exception(msg);
            }


        } else if(isNull(artifactName) && isNull(jobJarUrl) && isNull(version) && schedulingInfoNotValid(schedulingInfo)){ // Nothing is given. Use the latest on the cluster

            // fetch latest job cluster config
            jobClusterConfig = jobClusterMetadata.getJobClusterDefinition().getJobClusterConfig();
            // set version to it
            version = jobClusterConfig.getVersion();
            // use scheduling info from that.
            schedulingInfo = jobClusterConfig.getSchedulingInfo();
            artifactName = jobClusterConfig.getArtifactName();
            jobJarUrl = jobClusterConfig.getJobJarUrl();

        } else {
            // exception should never get here.
            throw new Exception(String.format("Invalid case for resolveJobDefinition artifactName %s jobJarUrl %s version %s schedulingInfo %s", jobJarUrl, artifactName, version, schedulingInfo));
        }

        logger.info("Resolved version {}, schedulingInfo {}, artifactName {}, jobJarUrl {}", version, schedulingInfo, artifactName, jobJarUrl);

        if(isNull(artifactName)  || isNull(jobJarUrl)  || isNull(version) || schedulingInfoNotValid(schedulingInfo)) {
            String msg = String.format(" SchedulingInfo %s or artifact %s or jobJarUrl %s or version %s could not be resolved in JobCluster %s. Job Submit fails", schedulingInfo, artifactName, jobJarUrl, version, jobClusterMetadata.getJobClusterDefinition().getName());
            logger.warn(msg);
            throw new Exception(msg);
        }
        return new JobDefinition.Builder()
                .from(resolvedJobDefn)
                .withParameters(parameters)
                .withLabels(labels)
                .withSchedulingInfo(schedulingInfo)
                .withUser(user)
                .withVersion(version)
                .withArtifactName(artifactName)
                .withJobJarUrl(jobJarUrl)
                .build();

    }