private DuccWorkJob create()

in uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/factory/JobFactory.java [456:736]


	private DuccWorkJob create(JobRequestProperties jobRequestProperties, DuccWorkJob job) {
		String methodName = "create";
		jobRequestProperties.normalize();
		DuccId jobid = job.getDuccId();
		DuccType duccType = job.getDuccType();
    // Service Deployment Type
    if (jobRequestProperties.containsKey(ServiceRequestProperties.key_service_type_custom)) {
      job.setServiceDeploymentType(ServiceDeploymentType.custom);
    } else if (jobRequestProperties.containsKey(ServiceRequestProperties.key_service_type_other)) {
      job.setServiceDeploymentType(ServiceDeploymentType.other);
    } else if (jobRequestProperties.containsKey(ServiceRequestProperties.key_service_type_uima)) {
      job.setServiceDeploymentType(ServiceDeploymentType.uima);
    } else {
      job.setServiceDeploymentType(ServiceDeploymentType.unspecified);
    }
    // Service Id
    String serviceId = null;
    if (jobRequestProperties.containsKey(ServiceRequestProperties.key_service_id)) {
      serviceId = jobRequestProperties.getProperty(ServiceRequestProperties.key_service_id);
    }
    job.setServiceId(serviceId);
    
    // sweep out leftover logging trash
		logSweeper(jobRequestProperties.getProperty(JobRequestProperties.key_log_directory), job.getDuccId());
		// log
		jobRequestProperties.specification(logger, job.getDuccId());
		// java command
		String javaCmd = jobRequestProperties.getProperty(JobSpecificationProperties.key_jvm);
		if(javaCmd == null) {
            // Agent will set javaCmd for Driver and Processes
		}
		// standard info
		DuccStandardInfo standardInfo = new DuccStandardInfo();
		job.setStandardInfo(standardInfo);
		standardInfo.setUser(jobRequestProperties.getProperty(JobSpecificationProperties.key_user));
		standardInfo.setSubmitter(jobRequestProperties.getProperty(JobSpecificationProperties.key_submitter_pid_at_host));
		standardInfo.setDateOfSubmission(TimeStamp.getCurrentMillis());
		standardInfo.setDateOfCompletion(null);
		standardInfo.setDescription(jobRequestProperties.getProperty(JobSpecificationProperties.key_description));
		standardInfo.setLogDirectory(jobRequestProperties.getProperty(JobSpecificationProperties.key_log_directory));
		standardInfo.setWorkingDirectory(jobRequestProperties.getProperty(JobSpecificationProperties.key_working_directory));
		String notifications = jobRequestProperties.getProperty(JobSpecificationProperties.key_notifications);
		if(notifications == null) {
			standardInfo.setNotifications(null);
		}
		else {
			String[] notificationsArray = notifications.split(" ,");
			for(int i=0; i < notificationsArray.length; i++) {
				notificationsArray[i] =  notificationsArray[i].trim();
			}
			standardInfo.setNotifications(notificationsArray);
		}

		// scheduling info
		DuccSchedulingInfo schedulingInfo = new DuccSchedulingInfo();
		job.setSchedulingInfo(schedulingInfo);
		String memory_process_size = jobRequestProperties.getProperty(JobSpecificationProperties.key_process_memory_size);
		long jpGB = JobFactoryHelper.getByteSizeJobProcess(memory_process_size) / JobFactoryHelper.GB;
		if(jpGB > 0) {
			schedulingInfo.setMemorySizeRequested(""+jpGB);
		}
		schedulingInfo.setSchedulingClass(jobRequestProperties.getProperty(JobSpecificationProperties.key_scheduling_class));
		schedulingInfo.setMachineList(jobRequestProperties.getProperty(JobSpecificationProperties.key_machine_list));
		schedulingInfo.setSchedulingPriority(jobRequestProperties.getProperty(JobSpecificationProperties.key_scheduling_priority));
		schedulingInfo.setProcessesMax(jobRequestProperties.getProperty(JobSpecificationProperties.key_process_deployments_max));
		schedulingInfo.setProcessesMin(jobRequestProperties.getProperty(JobSpecificationProperties.key_process_deployments_min));
		schedulingInfo.setThreadsPerProcess(jobRequestProperties.getProperty(JobSpecificationProperties.key_process_pipeline_count));
		schedulingInfo.setMemorySizeRequested(jobRequestProperties.getProperty(JobSpecificationProperties.key_process_memory_size));
		schedulingInfo.setMemoryUnits(MemoryUnits.GB);

		if (job.getDuccType() == DuccType.Job){
		    checkSchedulingLimits(job, schedulingInfo);
		}

		// process_initialization_time_max (in minutes)
		String pi_time = jobRequestProperties.getProperty(JobRequestProperties.key_process_initialization_time_max);
		if(pi_time == null) {
			pi_time = DuccPropertiesResolver.get(DuccPropertiesResolver.ducc_default_process_init_time_max);
		}
		try {
			long value = Long.parseLong(pi_time)*60*1000;
			standardInfo.setProcessInitializationTimeMax(value);
		}
		catch(Exception e) {
			logger.error(methodName, job.getDuccId(), e);
		}
		// jp or sp
		JavaCommandLine pipelineCommandLine = new JavaCommandLine(javaCmd);
		pipelineCommandLine.setClassName("main:provided-by-Process-Manager");
		ServiceDeploymentType serviceDeploymentType = job.getServiceDeploymentType();
		switch(duccType) {
		case Service:
			String name = JobSpecificationProperties.key_process_DD;
			String arg = jobRequestProperties.getProperty(name);
			logger.debug(methodName, job.getDuccId(), name+": "+arg);
			pipelineCommandLine.addArgument(arg);
			break;
		default:
			break;
		}
		if(isJpUima(duccType, serviceDeploymentType)) {
			String process_DD = jobRequestProperties.getProperty(JobSpecificationProperties.key_process_DD);
			if(process_DD != null) {
				// user DD
				IDuccUimaDeploymentDescriptor uimaDeploymentDescriptor = new DuccUimaDeploymentDescriptor(process_DD);
				job.setUimaDeployableConfiguration(uimaDeploymentDescriptor);
				dump(job, uimaDeploymentDescriptor);
			}
			else {
				// UIMA aggregate
				String jdQueuePrefix = dpr.getProperty(DuccPropertiesResolver.ducc_jd_queue_prefix);
				String name = jdQueuePrefix+job.getDuccId().toString();
				String description = job.getStandardInfo().getDescription();
				int threadCount = Integer.parseInt(job.getSchedulingInfo().getThreadsPerProcess());
				String brokerURL = job.getjobBroker();;
				String endpoint = job.getjobQueue();
				ArrayList<IDuccUimaAggregateComponent> components = new ArrayList<IDuccUimaAggregateComponent>();
				String CMDescriptor = jobRequestProperties.getProperty(JobSpecificationProperties.key_process_descriptor_CM);
				if(CMDescriptor != null) {
					ArrayList<String> CMOverrides = toArrayList(jobRequestProperties.getProperty(JobSpecificationProperties.key_process_descriptor_CM_overrides));
					IDuccUimaAggregateComponent componentCM = new DuccUimaAggregateComponent(CMDescriptor, CMOverrides);
					components.add(componentCM);
				}
				String AEDescriptor = jobRequestProperties.getProperty(JobSpecificationProperties.key_process_descriptor_AE);
				if(AEDescriptor != null) {
					ArrayList<String> AEOverrides = toArrayList(jobRequestProperties.getProperty(JobSpecificationProperties.key_process_descriptor_AE_overrides));
					IDuccUimaAggregateComponent componentAE = new DuccUimaAggregateComponent(AEDescriptor, AEOverrides);
					components.add(componentAE);
				}
				String CCDescriptor = jobRequestProperties.getProperty(JobSpecificationProperties.key_process_descriptor_CC);
				if(CCDescriptor != null) {
					ArrayList<String> CCOverrides = toArrayList(jobRequestProperties.getProperty(JobSpecificationProperties.key_process_descriptor_CC_overrides));
					IDuccUimaAggregateComponent componentCC = new DuccUimaAggregateComponent(CCDescriptor, CCOverrides);
					components.add(componentCC);
				}
				IDuccUimaAggregate uimaAggregate = new DuccUimaAggregate(name,description,threadCount,brokerURL,endpoint,components);
				job.setUimaDeployableConfiguration(uimaAggregate);
				dump(job, uimaAggregate);
			}
			// user CP
			String prependUserCP = getPrependUserCP(jobid, jobRequestProperties);
			String userCP = jobRequestProperties.getProperty(JobSpecificationProperties.key_classpath);
			userCP = addUimaDucc(prependUserCP, userCP);
			pipelineCommandLine.setClasspath(userCP);
			// jvm args
			String process_jvm_args = jobRequestProperties.getProperty(JobSpecificationProperties.key_process_jvm_args);
			ArrayList<String> pTokens = QuotedOptions.tokenizeList(process_jvm_args, true);
			for(String token : pTokens) {
				pipelineCommandLine.addOption(token);
			}
		    // Add any site-provided JVM opts
	        String siteJvmArgs = DuccPropertiesResolver.getInstance().getFileProperty(DuccPropertiesResolver.ducc_process_jvm_args);
	        pTokens = QuotedOptions.tokenizeList(siteJvmArgs, true);   // a null arg is acceptable
	        for(String token : pTokens) {
	            pipelineCommandLine.addOption(token);
	        }
			// add ducc CP
	        String duccCP = getDuccClasspath(1);
			String opt = FlagsHelper.Name.DuccClasspath.dname()+"="+duccCP;
			logger.debug(methodName, job.getDuccId(), "opt pipeline: "+opt);
			pipelineCommandLine.addOption(opt);
			// add JpType
			if(process_DD != null) {
				addDashD(pipelineCommandLine, FlagsHelper.Name.JpType, "uima-as");
				// <flags for JP to build DD>
				addDashD(pipelineCommandLine, FlagsHelper.Name.JpDd, process_DD);
				addDashD(pipelineCommandLine, FlagsHelper.Name.JobDirectory, jobRequestProperties.getProperty(JobSpecificationProperties.key_log_directory));
				addDashD(pipelineCommandLine, FlagsHelper.Name.JpThreadCount, jobRequestProperties.getProperty(JobSpecificationProperties.key_process_pipeline_count));
				// </flags for JP to build DD>
			}
			else {
				addDashD(pipelineCommandLine, FlagsHelper.Name.JpType, "uima");
				// <flags for JP to build Aggregate>
				addDashD(pipelineCommandLine, FlagsHelper.Name.JpAeDescriptor, jobRequestProperties.getProperty(JobSpecificationProperties.key_process_descriptor_AE));
				addDashD(pipelineCommandLine, FlagsHelper.Name.JpAeOverrides, jobRequestProperties.getProperty(JobSpecificationProperties.key_process_descriptor_AE_overrides));
				addDashD(pipelineCommandLine, FlagsHelper.Name.JpCcDescriptor, jobRequestProperties.getProperty(JobSpecificationProperties.key_process_descriptor_CC));
				addDashD(pipelineCommandLine, FlagsHelper.Name.JpCcOverrides, jobRequestProperties.getProperty(JobSpecificationProperties.key_process_descriptor_CC_overrides));
				addDashD(pipelineCommandLine, FlagsHelper.Name.JpCmDescriptor, jobRequestProperties.getProperty(JobSpecificationProperties.key_process_descriptor_CM));
				addDashD(pipelineCommandLine, FlagsHelper.Name.JpCmOverrides, jobRequestProperties.getProperty(JobSpecificationProperties.key_process_descriptor_CM_overrides));
				addDashD(pipelineCommandLine, FlagsHelper.Name.JobDirectory, jobRequestProperties.getProperty(JobSpecificationProperties.key_log_directory));
				String keyFCRS = "ducc.flow-controller.specifier";
				String valueFCRS = DuccPropertiesResolver.getInstance().getFileProperty(keyFCRS);
				addDashD(pipelineCommandLine, FlagsHelper.Name.JpFlowController, valueFCRS);
				addDashD(pipelineCommandLine, FlagsHelper.Name.JpThreadCount, jobRequestProperties.getProperty(JobSpecificationProperties.key_process_pipeline_count));
				// </flags for JP to build Aggregate>
			}

			String process_error_threshold = jobRequestProperties.getProperty(JobSpecificationProperties.key_process_error_threshold);
			if(process_error_threshold != null) {
				addDashD(pipelineCommandLine, FlagsHelper.Name.JpErrorThreshold, process_error_threshold);
			}
			String process_error_window = jobRequestProperties.getProperty(JobSpecificationProperties.key_process_error_window);
			if(process_error_window != null) {
				addDashD(pipelineCommandLine, FlagsHelper.Name.JpErrorWindowSize, process_error_window);
			}
			
			String process_thread_count = jobRequestProperties.getProperty(JobSpecificationProperties.key_process_pipeline_count);
			if(process_thread_count != null) {
				addDashD(pipelineCommandLine, FlagsHelper.Name.JpThreadCount, process_thread_count);
			}

			String processEnvironmentVariables = jobRequestProperties.getProperty(JobSpecificationProperties.key_environment);
			int envCountProcess = addEnvironment(job, "process", pipelineCommandLine, processEnvironmentVariables);
			logger.info(methodName, job.getDuccId(), "process env vars: "+envCountProcess);
			logger.debug(methodName, job.getDuccId(), "pipeline: "+pipelineCommandLine.getCommand());
			pipelineCommandLine.setLogDirectory(jobRequestProperties.getProperty(JobSpecificationProperties.key_log_directory));
			job.setCommandLine(pipelineCommandLine);
		}
		else {
			// ducclet (sometimes known as arbitrary process)
			String process_executable = jobRequestProperties.getProperty(JobSpecificationProperties.key_process_executable);
			NonJavaCommandLine executableProcessCommandLine = new NonJavaCommandLine(process_executable);
			String processEnvironmentVariables = jobRequestProperties.getProperty(JobSpecificationProperties.key_environment);
			int envCountProcess = addEnvironment(job, "process", executableProcessCommandLine, processEnvironmentVariables);
			logger.info(methodName, job.getDuccId(), "process env vars: "+envCountProcess);
			logger.debug(methodName, job.getDuccId(), "ducclet: "+executableProcessCommandLine.getCommandLineString());
			job.setCommandLine(executableProcessCommandLine);
			// Tokenize arguments string and strip any quotes, then add to command line.
			// Note: placeholders replaced by CLI so can avoid the add method.
			List<String> process_executable_arguments = QuotedOptions.tokenizeList(
			        jobRequestProperties.getProperty(JobSpecificationProperties.key_process_executable_args), true);
			executableProcessCommandLine.getArguments().addAll(process_executable_arguments);
		}
		// process_initialization_failures_cap
		// ?? These are not set for APs or SPs ??
		String failures_cap = jobRequestProperties.getProperty(JobSpecificationProperties.key_process_initialization_failures_cap);
		try {
			long process_failures_cap = Long.parseLong(failures_cap);
			if(process_failures_cap > 0) {
				job.setProcessInitFailureCap(process_failures_cap);
			}
			else {
				logger.warn(methodName, job.getDuccId(), "invalid "+JobSpecificationProperties.key_process_initialization_failures_cap+": "+failures_cap);
			}
		}
		catch(Exception e) {
			logger.warn(methodName, job.getDuccId(), "invalid "+JobSpecificationProperties.key_process_initialization_failures_cap+": "+failures_cap);
		}
		// process_failures_limit
		String failures_limit = jobRequestProperties.getProperty(JobSpecificationProperties.key_process_failures_limit);
		try {
			long process_failures_limit = Long.parseLong(failures_limit);
			if(process_failures_limit > 0) {
				job.setProcessFailureLimit(process_failures_limit);
			}
			else {
				logger.warn(methodName, job.getDuccId(), "invalid "+JobSpecificationProperties.key_process_failures_limit+": "+failures_limit);
			}
		}
		catch(Exception e) {
			logger.warn(methodName, job.getDuccId(), "invalid "+JobSpecificationProperties.key_process_failures_limit+": "+failures_limit);
		}
        //
        // Set the service dependency, if there is one.
        //
        String depstr = jobRequestProperties.getProperty(JobSpecificationProperties.key_service_dependency);
        if ( depstr == null ) {
            logger.debug(methodName, job.getDuccId(), "No service dependencies");
        } else {
            logger.debug(methodName, job.getDuccId(), "Adding service dependency", depstr);
            String[] deps = depstr.split("\\s+");
            job.setServiceDependencies(deps);
        }
        // Service Endpoint
        String ep = jobRequestProperties.getProperty(ServiceRequestProperties.key_service_request_endpoint);
        if ( ep == null ) {
            logger.debug(methodName, job.getDuccId(), "No service endpoint");
        } else {
            logger.debug(methodName, job.getDuccId(), "Adding service endpoint", ep);
            job.setServiceEndpoint(ep);
        }
        // Cancel On Interrupt
        if(jobRequestProperties.containsKey(JobSpecificationProperties.key_cancel_on_interrupt)) {
        	job.setCancelOnInterrupt();
        }
        else if(jobRequestProperties.containsKey(ReservationSpecificationProperties.key_cancel_managed_reservation_on_interrupt)) {
        	job.setCancelOnInterrupt();
        }
		//TODO be sure to clean-up fpath upon job completion!
		return job;
	}