in uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/factory/JobFactory.java [456:736]
private DuccWorkJob create(JobRequestProperties jobRequestProperties, DuccWorkJob job) {
String methodName = "create";
jobRequestProperties.normalize();
DuccId jobid = job.getDuccId();
DuccType duccType = job.getDuccType();
// Service Deployment Type
if (jobRequestProperties.containsKey(ServiceRequestProperties.key_service_type_custom)) {
job.setServiceDeploymentType(ServiceDeploymentType.custom);
} else if (jobRequestProperties.containsKey(ServiceRequestProperties.key_service_type_other)) {
job.setServiceDeploymentType(ServiceDeploymentType.other);
} else if (jobRequestProperties.containsKey(ServiceRequestProperties.key_service_type_uima)) {
job.setServiceDeploymentType(ServiceDeploymentType.uima);
} else {
job.setServiceDeploymentType(ServiceDeploymentType.unspecified);
}
// Service Id
String serviceId = null;
if (jobRequestProperties.containsKey(ServiceRequestProperties.key_service_id)) {
serviceId = jobRequestProperties.getProperty(ServiceRequestProperties.key_service_id);
}
job.setServiceId(serviceId);
// sweep out leftover logging trash
logSweeper(jobRequestProperties.getProperty(JobRequestProperties.key_log_directory), job.getDuccId());
// log
jobRequestProperties.specification(logger, job.getDuccId());
// java command
String javaCmd = jobRequestProperties.getProperty(JobSpecificationProperties.key_jvm);
if(javaCmd == null) {
// Agent will set javaCmd for Driver and Processes
}
// standard info
DuccStandardInfo standardInfo = new DuccStandardInfo();
job.setStandardInfo(standardInfo);
standardInfo.setUser(jobRequestProperties.getProperty(JobSpecificationProperties.key_user));
standardInfo.setSubmitter(jobRequestProperties.getProperty(JobSpecificationProperties.key_submitter_pid_at_host));
standardInfo.setDateOfSubmission(TimeStamp.getCurrentMillis());
standardInfo.setDateOfCompletion(null);
standardInfo.setDescription(jobRequestProperties.getProperty(JobSpecificationProperties.key_description));
standardInfo.setLogDirectory(jobRequestProperties.getProperty(JobSpecificationProperties.key_log_directory));
standardInfo.setWorkingDirectory(jobRequestProperties.getProperty(JobSpecificationProperties.key_working_directory));
String notifications = jobRequestProperties.getProperty(JobSpecificationProperties.key_notifications);
if(notifications == null) {
standardInfo.setNotifications(null);
}
else {
String[] notificationsArray = notifications.split(" ,");
for(int i=0; i < notificationsArray.length; i++) {
notificationsArray[i] = notificationsArray[i].trim();
}
standardInfo.setNotifications(notificationsArray);
}
// scheduling info
DuccSchedulingInfo schedulingInfo = new DuccSchedulingInfo();
job.setSchedulingInfo(schedulingInfo);
String memory_process_size = jobRequestProperties.getProperty(JobSpecificationProperties.key_process_memory_size);
long jpGB = JobFactoryHelper.getByteSizeJobProcess(memory_process_size) / JobFactoryHelper.GB;
if(jpGB > 0) {
schedulingInfo.setMemorySizeRequested(""+jpGB);
}
schedulingInfo.setSchedulingClass(jobRequestProperties.getProperty(JobSpecificationProperties.key_scheduling_class));
schedulingInfo.setMachineList(jobRequestProperties.getProperty(JobSpecificationProperties.key_machine_list));
schedulingInfo.setSchedulingPriority(jobRequestProperties.getProperty(JobSpecificationProperties.key_scheduling_priority));
schedulingInfo.setProcessesMax(jobRequestProperties.getProperty(JobSpecificationProperties.key_process_deployments_max));
schedulingInfo.setProcessesMin(jobRequestProperties.getProperty(JobSpecificationProperties.key_process_deployments_min));
schedulingInfo.setThreadsPerProcess(jobRequestProperties.getProperty(JobSpecificationProperties.key_process_pipeline_count));
schedulingInfo.setMemorySizeRequested(jobRequestProperties.getProperty(JobSpecificationProperties.key_process_memory_size));
schedulingInfo.setMemoryUnits(MemoryUnits.GB);
if (job.getDuccType() == DuccType.Job){
checkSchedulingLimits(job, schedulingInfo);
}
// process_initialization_time_max (in minutes)
String pi_time = jobRequestProperties.getProperty(JobRequestProperties.key_process_initialization_time_max);
if(pi_time == null) {
pi_time = DuccPropertiesResolver.get(DuccPropertiesResolver.ducc_default_process_init_time_max);
}
try {
long value = Long.parseLong(pi_time)*60*1000;
standardInfo.setProcessInitializationTimeMax(value);
}
catch(Exception e) {
logger.error(methodName, job.getDuccId(), e);
}
// jp or sp
JavaCommandLine pipelineCommandLine = new JavaCommandLine(javaCmd);
pipelineCommandLine.setClassName("main:provided-by-Process-Manager");
ServiceDeploymentType serviceDeploymentType = job.getServiceDeploymentType();
switch(duccType) {
case Service:
String name = JobSpecificationProperties.key_process_DD;
String arg = jobRequestProperties.getProperty(name);
logger.debug(methodName, job.getDuccId(), name+": "+arg);
pipelineCommandLine.addArgument(arg);
break;
default:
break;
}
if(isJpUima(duccType, serviceDeploymentType)) {
String process_DD = jobRequestProperties.getProperty(JobSpecificationProperties.key_process_DD);
if(process_DD != null) {
// user DD
IDuccUimaDeploymentDescriptor uimaDeploymentDescriptor = new DuccUimaDeploymentDescriptor(process_DD);
job.setUimaDeployableConfiguration(uimaDeploymentDescriptor);
dump(job, uimaDeploymentDescriptor);
}
else {
// UIMA aggregate
String jdQueuePrefix = dpr.getProperty(DuccPropertiesResolver.ducc_jd_queue_prefix);
String name = jdQueuePrefix+job.getDuccId().toString();
String description = job.getStandardInfo().getDescription();
int threadCount = Integer.parseInt(job.getSchedulingInfo().getThreadsPerProcess());
String brokerURL = job.getjobBroker();;
String endpoint = job.getjobQueue();
ArrayList<IDuccUimaAggregateComponent> components = new ArrayList<IDuccUimaAggregateComponent>();
String CMDescriptor = jobRequestProperties.getProperty(JobSpecificationProperties.key_process_descriptor_CM);
if(CMDescriptor != null) {
ArrayList<String> CMOverrides = toArrayList(jobRequestProperties.getProperty(JobSpecificationProperties.key_process_descriptor_CM_overrides));
IDuccUimaAggregateComponent componentCM = new DuccUimaAggregateComponent(CMDescriptor, CMOverrides);
components.add(componentCM);
}
String AEDescriptor = jobRequestProperties.getProperty(JobSpecificationProperties.key_process_descriptor_AE);
if(AEDescriptor != null) {
ArrayList<String> AEOverrides = toArrayList(jobRequestProperties.getProperty(JobSpecificationProperties.key_process_descriptor_AE_overrides));
IDuccUimaAggregateComponent componentAE = new DuccUimaAggregateComponent(AEDescriptor, AEOverrides);
components.add(componentAE);
}
String CCDescriptor = jobRequestProperties.getProperty(JobSpecificationProperties.key_process_descriptor_CC);
if(CCDescriptor != null) {
ArrayList<String> CCOverrides = toArrayList(jobRequestProperties.getProperty(JobSpecificationProperties.key_process_descriptor_CC_overrides));
IDuccUimaAggregateComponent componentCC = new DuccUimaAggregateComponent(CCDescriptor, CCOverrides);
components.add(componentCC);
}
IDuccUimaAggregate uimaAggregate = new DuccUimaAggregate(name,description,threadCount,brokerURL,endpoint,components);
job.setUimaDeployableConfiguration(uimaAggregate);
dump(job, uimaAggregate);
}
// user CP
String prependUserCP = getPrependUserCP(jobid, jobRequestProperties);
String userCP = jobRequestProperties.getProperty(JobSpecificationProperties.key_classpath);
userCP = addUimaDucc(prependUserCP, userCP);
pipelineCommandLine.setClasspath(userCP);
// jvm args
String process_jvm_args = jobRequestProperties.getProperty(JobSpecificationProperties.key_process_jvm_args);
ArrayList<String> pTokens = QuotedOptions.tokenizeList(process_jvm_args, true);
for(String token : pTokens) {
pipelineCommandLine.addOption(token);
}
// Add any site-provided JVM opts
String siteJvmArgs = DuccPropertiesResolver.getInstance().getFileProperty(DuccPropertiesResolver.ducc_process_jvm_args);
pTokens = QuotedOptions.tokenizeList(siteJvmArgs, true); // a null arg is acceptable
for(String token : pTokens) {
pipelineCommandLine.addOption(token);
}
// add ducc CP
String duccCP = getDuccClasspath(1);
String opt = FlagsHelper.Name.DuccClasspath.dname()+"="+duccCP;
logger.debug(methodName, job.getDuccId(), "opt pipeline: "+opt);
pipelineCommandLine.addOption(opt);
// add JpType
if(process_DD != null) {
addDashD(pipelineCommandLine, FlagsHelper.Name.JpType, "uima-as");
// <flags for JP to build DD>
addDashD(pipelineCommandLine, FlagsHelper.Name.JpDd, process_DD);
addDashD(pipelineCommandLine, FlagsHelper.Name.JobDirectory, jobRequestProperties.getProperty(JobSpecificationProperties.key_log_directory));
addDashD(pipelineCommandLine, FlagsHelper.Name.JpThreadCount, jobRequestProperties.getProperty(JobSpecificationProperties.key_process_pipeline_count));
// </flags for JP to build DD>
}
else {
addDashD(pipelineCommandLine, FlagsHelper.Name.JpType, "uima");
// <flags for JP to build Aggregate>
addDashD(pipelineCommandLine, FlagsHelper.Name.JpAeDescriptor, jobRequestProperties.getProperty(JobSpecificationProperties.key_process_descriptor_AE));
addDashD(pipelineCommandLine, FlagsHelper.Name.JpAeOverrides, jobRequestProperties.getProperty(JobSpecificationProperties.key_process_descriptor_AE_overrides));
addDashD(pipelineCommandLine, FlagsHelper.Name.JpCcDescriptor, jobRequestProperties.getProperty(JobSpecificationProperties.key_process_descriptor_CC));
addDashD(pipelineCommandLine, FlagsHelper.Name.JpCcOverrides, jobRequestProperties.getProperty(JobSpecificationProperties.key_process_descriptor_CC_overrides));
addDashD(pipelineCommandLine, FlagsHelper.Name.JpCmDescriptor, jobRequestProperties.getProperty(JobSpecificationProperties.key_process_descriptor_CM));
addDashD(pipelineCommandLine, FlagsHelper.Name.JpCmOverrides, jobRequestProperties.getProperty(JobSpecificationProperties.key_process_descriptor_CM_overrides));
addDashD(pipelineCommandLine, FlagsHelper.Name.JobDirectory, jobRequestProperties.getProperty(JobSpecificationProperties.key_log_directory));
String keyFCRS = "ducc.flow-controller.specifier";
String valueFCRS = DuccPropertiesResolver.getInstance().getFileProperty(keyFCRS);
addDashD(pipelineCommandLine, FlagsHelper.Name.JpFlowController, valueFCRS);
addDashD(pipelineCommandLine, FlagsHelper.Name.JpThreadCount, jobRequestProperties.getProperty(JobSpecificationProperties.key_process_pipeline_count));
// </flags for JP to build Aggregate>
}
String process_error_threshold = jobRequestProperties.getProperty(JobSpecificationProperties.key_process_error_threshold);
if(process_error_threshold != null) {
addDashD(pipelineCommandLine, FlagsHelper.Name.JpErrorThreshold, process_error_threshold);
}
String process_error_window = jobRequestProperties.getProperty(JobSpecificationProperties.key_process_error_window);
if(process_error_window != null) {
addDashD(pipelineCommandLine, FlagsHelper.Name.JpErrorWindowSize, process_error_window);
}
String process_thread_count = jobRequestProperties.getProperty(JobSpecificationProperties.key_process_pipeline_count);
if(process_thread_count != null) {
addDashD(pipelineCommandLine, FlagsHelper.Name.JpThreadCount, process_thread_count);
}
String processEnvironmentVariables = jobRequestProperties.getProperty(JobSpecificationProperties.key_environment);
int envCountProcess = addEnvironment(job, "process", pipelineCommandLine, processEnvironmentVariables);
logger.info(methodName, job.getDuccId(), "process env vars: "+envCountProcess);
logger.debug(methodName, job.getDuccId(), "pipeline: "+pipelineCommandLine.getCommand());
pipelineCommandLine.setLogDirectory(jobRequestProperties.getProperty(JobSpecificationProperties.key_log_directory));
job.setCommandLine(pipelineCommandLine);
}
else {
// ducclet (sometimes known as arbitrary process)
String process_executable = jobRequestProperties.getProperty(JobSpecificationProperties.key_process_executable);
NonJavaCommandLine executableProcessCommandLine = new NonJavaCommandLine(process_executable);
String processEnvironmentVariables = jobRequestProperties.getProperty(JobSpecificationProperties.key_environment);
int envCountProcess = addEnvironment(job, "process", executableProcessCommandLine, processEnvironmentVariables);
logger.info(methodName, job.getDuccId(), "process env vars: "+envCountProcess);
logger.debug(methodName, job.getDuccId(), "ducclet: "+executableProcessCommandLine.getCommandLineString());
job.setCommandLine(executableProcessCommandLine);
// Tokenize arguments string and strip any quotes, then add to command line.
// Note: placeholders replaced by CLI so can avoid the add method.
List<String> process_executable_arguments = QuotedOptions.tokenizeList(
jobRequestProperties.getProperty(JobSpecificationProperties.key_process_executable_args), true);
executableProcessCommandLine.getArguments().addAll(process_executable_arguments);
}
// process_initialization_failures_cap
// ?? These are not set for APs or SPs ??
String failures_cap = jobRequestProperties.getProperty(JobSpecificationProperties.key_process_initialization_failures_cap);
try {
long process_failures_cap = Long.parseLong(failures_cap);
if(process_failures_cap > 0) {
job.setProcessInitFailureCap(process_failures_cap);
}
else {
logger.warn(methodName, job.getDuccId(), "invalid "+JobSpecificationProperties.key_process_initialization_failures_cap+": "+failures_cap);
}
}
catch(Exception e) {
logger.warn(methodName, job.getDuccId(), "invalid "+JobSpecificationProperties.key_process_initialization_failures_cap+": "+failures_cap);
}
// process_failures_limit
String failures_limit = jobRequestProperties.getProperty(JobSpecificationProperties.key_process_failures_limit);
try {
long process_failures_limit = Long.parseLong(failures_limit);
if(process_failures_limit > 0) {
job.setProcessFailureLimit(process_failures_limit);
}
else {
logger.warn(methodName, job.getDuccId(), "invalid "+JobSpecificationProperties.key_process_failures_limit+": "+failures_limit);
}
}
catch(Exception e) {
logger.warn(methodName, job.getDuccId(), "invalid "+JobSpecificationProperties.key_process_failures_limit+": "+failures_limit);
}
//
// Set the service dependency, if there is one.
//
String depstr = jobRequestProperties.getProperty(JobSpecificationProperties.key_service_dependency);
if ( depstr == null ) {
logger.debug(methodName, job.getDuccId(), "No service dependencies");
} else {
logger.debug(methodName, job.getDuccId(), "Adding service dependency", depstr);
String[] deps = depstr.split("\\s+");
job.setServiceDependencies(deps);
}
// Service Endpoint
String ep = jobRequestProperties.getProperty(ServiceRequestProperties.key_service_request_endpoint);
if ( ep == null ) {
logger.debug(methodName, job.getDuccId(), "No service endpoint");
} else {
logger.debug(methodName, job.getDuccId(), "Adding service endpoint", ep);
job.setServiceEndpoint(ep);
}
// Cancel On Interrupt
if(jobRequestProperties.containsKey(JobSpecificationProperties.key_cancel_on_interrupt)) {
job.setCancelOnInterrupt();
}
else if(jobRequestProperties.containsKey(ReservationSpecificationProperties.key_cancel_managed_reservation_on_interrupt)) {
job.setCancelOnInterrupt();
}
//TODO be sure to clean-up fpath upon job completion!
return job;
}