in uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/JobManagerConverter.java [498:717]
boolean jobArrives(IDuccWork job)
{
String methodName = "jobArrives";
logger.trace(methodName, job.getDuccId(), "Job arives");
logger.trace(methodName, job.getDuccId(), "Job is of type", job.getDuccType());
// Properties props = new Properties();
// Set<String> keys = props.stringPropertyNames();
// for ( String k : keys ) {
// logger.debug("methodName", job.getDuccId(), "Next property [", k, ", ", props.getProperty(k), "]");
// }
// Properties rmProps = new DuccProperties();
// for ( int i = 0; i < requiredProperties.length; i++ ) {
// String v = props.getProperty(requiredProperties[i]);
// if ( v == null ) {
// v = defaultValues[i];
// }
// rmProps.setProperty(rmProperties[i], v);
// }
// IRmJob j = new RmJob(job.getDuccId(), rmProps);
// Convert Lou's structure into mine.
IRmJob j = new RmJob(job.getDuccId());
boolean mustRecover = isRecovered(job); // UIMA-4142
IDuccSchedulingInfo si = job.getSchedulingInfo();
IDuccStandardInfo sti = job.getStandardInfo();
String name = sti.getDescription();
if ( name == null ) {
name = "A Job With No Name.";
}
String user_name = sti.getUser().trim();
j.setUserName(user_name);
j.setJobName(name);
j.setServiceId(toLong(job.getServiceId(), 0L)); // UIMA-4712 only non-zero on actual service instances
int threads = toInt(si.getThreadsPerProcess(), scheduler.getDefaultNThreads());
int user_priority = toInt(si.getSchedulingPriority(), 100);
int total_work = toInt(si.getWorkItemsTotal(), scheduler.getDefaultNTasks());
int completed_work = toInt(si.getWorkItemsCompleted(), 0);
int remaining_work = Math.max(total_work - completed_work, 1); // never let this go 0 or negative - both cases
// are (probably user) errors.
logger.info(methodName, job.getDuccId(), "total_work", total_work, "completed_work", completed_work,"remaining_work", remaining_work);
int memory = toInt(si.getMemorySizeRequested(), scheduler.getDefaultMemory());
String className = si.getSchedulingClass();
if ( className == null ) {
switch ( job.getDuccType() ) {
case Job:
className = scheduler.getDefaultFairShareName();
break;
case Service:
case Pop:
case Reservation:
className = scheduler.getDefaultReserveName();
break;
}
if ( className == null ) {
j.refuse("No scheduling class defined and no default class configured.");
return false;
}
}
j.setThreads(threads);
j.setUserPriority(user_priority);
j.setNQuestions(total_work, remaining_work, 0.0);
j.setClassName(className);
List<String> machineList = si.getMachineList();
j.setMachineList(machineList);
if(machineList != null) {
if(!machineList.isEmpty()) {
String message = "machine list: "+String.join(" ", machineList);
logger.info(methodName, job.getDuccId(), message);
}
}
switch (si.getMemoryUnits()) {
case GB:
break;
default:
logger.warn(methodName, job.getDuccId(), "Memory units other than GB are not currently supported. Job returned.");
break;
}
j.setMemory(memory);
j.init();
j.setTimestamp(Long.parseLong(sti.getDateOfSubmission()));
// logger.info(methodName, j.getId(), "SUBMISSION DATE:", subd, (new Date(subd)).toString());
if ( job instanceof IDuccWorkJob ) {
j.setInitWait( ((IDuccWorkJob) job).isRunnable());
} else {
j.setInitWait(true); // pop is always ready to go
}
j.setDuccType(job.getDuccType()); // ugly and artificial but ... not going to rant here
// it's needed so messages can be made legible
switch ( job.getDuccType() ) { // UIMA-4142 to distinguish between service and AP
case Service:
case Pop:
if ( ((IDuccWorkService)job).getServiceDeploymentType() == ServiceDeploymentType.other ) {
j.setArbitraryProcess();
}
break;
default:
break;
}
//
// Now: must either create a new job, or recover one that we didn't know about, on the assumption that we
// have just crashed and are recovering.
//
// Be SURE that if status is turned false for any reason, or if you exit early with false, that you
// refuse() the job.
//
boolean status = true;
int max_processes = 0;
// int max_machines = 0;
ResourceClass rescl = scheduler.getResourceClass(className);
if ( rescl == null ) {
// oh darn, we can't continue past this point
refuse(j, "Cannot find priority class " + className + " for job");
// UIMA-4142
// However, fs this is recovery and we get here, it's because somehow the class definition
// got deleted. In this case there might be resources assigned. We must evict if possible.
// All affected hosts must be blacklisted. We need to remember all this so we can unblacklist them
// if the resources ever become free.
blacklist(job, memory);
return false;
}
if ( !rescl.authorized(user_name) ) {
// UIMA-4275
// if not recovering, and the class is not authorized, stop it dead here
// if we are recovering, might no longer be authorized - the main scheduler will
// deal with this as appropriate for the scheduling policy.
refuse(j, "User '" + user_name + "' not authorized to use class '" + className + "'");
if ( ! mustRecover ) {
return false;
}
}
j.setResourceClass(rescl);
// if ( logger.isDebug() ) {
// logger.debug(methodName, j.getId(),"sharesMax", si.getSharesMax());
// logger.debug(methodName, j.getId(),"getInstancesCount", si.getInstancesCount());
// logger.debug(methodName, j.getId(), "rescl.getMaxProcesses", rescl.getMaxProcesses());
// logger.debug(methodName, j.getId(), "rescl.getMaxMachines", rescl.getMaxMachines());
// }
switch ( job.getDuccType() ) {
// UIMA-4275, must enforce max allocations as 1 for Service and Pop/
case Service:
case Pop:
switch ( rescl.getPolicy() ) {
case FAIR_SHARE:
refuse(j, "Services and managed reservations are not allowed to be FAIR_SHARE");
break;
case FIXED_SHARE:
j.setMaxShares(1);
break;
case RESERVE:
j.setMaxShares(1);
break;
}
status = receiveExecutable(j, job, mustRecover); // UIMA-4142, add mustRecover flag
logger.trace(methodName, j.getId(), "Serivce, or Pop arrives, accepted:", status);
break;
case Job:
// instance and share count are a function of the class
max_processes = toInt(si.getProcessesMax(), DEFAULT_PROCESSES);
switch ( rescl.getPolicy() ) {
case FAIR_SHARE:
j.setMaxShares(max_processes);
break;
case FIXED_SHARE:
j.setMaxShares(max_processes);
break;
case RESERVE:
// max_machines = toInt(si.getSharesMax(), DEFAULT_INSTANCES);
j.setMaxShares(max_processes);
break;
}
status = receiveExecutable(j, job, mustRecover); // UIMA-4142, add mustRecover flag
logger.trace(methodName, j.getId(), "Job arrives, accepted:", status);
break;
case Reservation:
// UIMA-4275. non-jobs restricted to exactly one allocation per request
j.setMaxShares(1);
status = receiveReservation(j, job, mustRecover); // UIMA-4142, add mustRecover flag
logger.trace(methodName, j.getId(), "Reservation arrives, accepted:", status);
break;
default:
refuse(j, "Unknown job type: " + job.getDuccType());
status = false;
break;
}
// logger.debug(methodName, j.getId(), "Max_processes:", max_processes);
// logger.debug(methodName, j.getId(), "Max_machines:", max_machines);
return status;
}