boolean jobArrives()

in uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/JobManagerConverter.java [498:717]


    boolean jobArrives(IDuccWork job)
    {
    	String methodName = "jobArrives";
        logger.trace(methodName, job.getDuccId(), "Job arives");
        logger.trace(methodName, job.getDuccId(), "Job is of type", job.getDuccType());

        // Properties props = new Properties();
        
        // Set<String> keys = props.stringPropertyNames();
        // for ( String k : keys ) {
        //     logger.debug("methodName", job.getDuccId(), "Next property [", k, ", ", props.getProperty(k), "]");
        // }
        
        // Properties rmProps = new DuccProperties();
        // for ( int i = 0; i < requiredProperties.length; i++ ) {
        //     String v = props.getProperty(requiredProperties[i]);
        //     if ( v == null ) {
        //         v = defaultValues[i];
        //     }
        //     rmProps.setProperty(rmProperties[i], v);
        // }
        // IRmJob j = new RmJob(job.getDuccId(), rmProps);

        // Convert Lou's structure into mine.
        IRmJob j = new RmJob(job.getDuccId());

        boolean mustRecover = isRecovered(job);             // UIMA-4142

        IDuccSchedulingInfo si = job.getSchedulingInfo();
        IDuccStandardInfo   sti = job.getStandardInfo();
        
        String name       = sti.getDescription();
        if ( name == null ) {
            name = "A Job With No Name.";
        }
        String user_name  = sti.getUser().trim();
        j.setUserName(user_name);
        j.setJobName(name);
        j.setServiceId(toLong(job.getServiceId(), 0L)); // UIMA-4712 only non-zero on actual service instances 

        int threads       = toInt(si.getThreadsPerProcess(), scheduler.getDefaultNThreads());
        int user_priority = toInt(si.getSchedulingPriority(), 100);

        int total_work    =  toInt(si.getWorkItemsTotal(), scheduler.getDefaultNTasks());
        int completed_work = toInt(si.getWorkItemsCompleted(), 0);
        int remaining_work = Math.max(total_work - completed_work, 1);  // never let this go 0 or negative - both cases
                                                                        // are (probably user) errors.

        logger.info(methodName, job.getDuccId(), "total_work", total_work, "completed_work", completed_work,"remaining_work", remaining_work);

        int memory        = toInt(si.getMemorySizeRequested(), scheduler.getDefaultMemory());
        String className  = si.getSchedulingClass();
        if ( className == null ) {
            switch ( job.getDuccType() ) {
               case Job:              
                   className = scheduler.getDefaultFairShareName();
                   break;
               case Service:
               case Pop:
               case Reservation:
                   className = scheduler.getDefaultReserveName();
                   break;
            }
            if ( className == null ) {
                j.refuse("No scheduling class defined and no default class configured.");
                return false;
            }
        }

        j.setThreads(threads);
        j.setUserPriority(user_priority);
        j.setNQuestions(total_work, remaining_work, 0.0);
        j.setClassName(className);
        
        List<String> machineList = si.getMachineList();
        j.setMachineList(machineList);
        if(machineList != null) {
        	if(!machineList.isEmpty()) {
        		String message = "machine list: "+String.join(" ", machineList);
        		logger.info(methodName, job.getDuccId(), message);
        	}
        }

        switch (si.getMemoryUnits()) {
            case GB:
                break;
            default:
                logger.warn(methodName, job.getDuccId(), "Memory units other than GB are not currently supported.  Job returned.");
                break;
        }
        j.setMemory(memory);
        j.init();

        j.setTimestamp(Long.parseLong(sti.getDateOfSubmission()));
        // logger.info(methodName, j.getId(), "SUBMISSION DATE:", subd, (new Date(subd)).toString());

        if ( job instanceof IDuccWorkJob ) {
            j.setInitWait( ((IDuccWorkJob) job).isRunnable());
        } else {
            j.setInitWait(true);                          // pop is always ready to go
        }

        j.setDuccType(job.getDuccType());                 // ugly and artificial but ... not going to rant here
                                                          // it's needed so messages can be made legible
        switch ( job.getDuccType() ) {                    // UIMA-4142 to distinguish between service and AP 
            case Service:
            case Pop:
                if  ( ((IDuccWorkService)job).getServiceDeploymentType() == ServiceDeploymentType.other )  {
                    j.setArbitraryProcess();
                }
                break;
            default:
                break;                    
        }


        //
        // Now: must either create a new job, or recover one that we didn't know about, on the assumption that we
        // have just crashed and are recovering.
        //
        // Be SURE that if status is turned false for any reason, or if you exit early with false, that you
        // refuse() the job.
        //
        boolean status = true;        
        
        int max_processes = 0;
       	// int max_machines = 0;	
        ResourceClass rescl = scheduler.getResourceClass(className);

        if ( rescl == null ) {
            // oh darn, we can't continue past this point
            refuse(j, "Cannot find priority class " + className + " for job");
            
            // UIMA-4142
            // However, fs this is recovery and we get here, it's because somehow the class definition
            // got deleted.  In this case there might be resources assigned.  We must evict if possible.
            // All affected hosts must be blacklisted.  We need to remember all this so we can unblacklist them
            // if the resources ever become free.
            blacklist(job, memory);
            return false;
        }
        if ( !rescl.authorized(user_name) ) { 
            // UIMA-4275
            // if not recovering, and the class is not authorized, stop it dead here
            // if we are recovering, might no longer be authorized - the main scheduler will
            // deal with this as appropriate for the scheduling policy.
            refuse(j, "User '" + user_name + "' not authorized to use class '" + className + "'");
            if ( ! mustRecover ) {
                return false;
            }
        }

        j.setResourceClass(rescl);

//         if ( logger.isDebug() ) {
//             logger.debug(methodName, j.getId(),"sharesMax", si.getSharesMax());
//                        logger.debug(methodName, j.getId(),"getInstancesCount", si.getInstancesCount());
//                        logger.debug(methodName, j.getId(), "rescl.getMaxProcesses", rescl.getMaxProcesses());
//                        logger.debug(methodName, j.getId(), "rescl.getMaxMachines", rescl.getMaxMachines());
//         }

        switch ( job.getDuccType() ) {
          // UIMA-4275, must enforce max allocations as 1 for Service and Pop/
          case Service:
          case Pop:
              switch ( rescl.getPolicy() ) {
                  case FAIR_SHARE:
                      refuse(j, "Services and managed reservations are not allowed to be FAIR_SHARE");
                      break;
                      
                  case FIXED_SHARE:
                      j.setMaxShares(1);
                      break;
                      
                  case RESERVE:
                      j.setMaxShares(1);
                      break;
              }
              status = receiveExecutable(j, job, mustRecover); // UIMA-4142, add mustRecover flag
              logger.trace(methodName, j.getId(), "Serivce, or Pop arrives, accepted:", status);
              break;
          case Job:              
              // instance and share count are a function of the class
              max_processes    = toInt(si.getProcessesMax(), DEFAULT_PROCESSES);
              switch ( rescl.getPolicy() ) {
                  case FAIR_SHARE:
                      j.setMaxShares(max_processes);
                      break;
                      
                  case FIXED_SHARE:
                      j.setMaxShares(max_processes);
                      break;
                      
                  case RESERVE:
                      // max_machines   = toInt(si.getSharesMax(), DEFAULT_INSTANCES);
                      j.setMaxShares(max_processes);
                      break;
              }
              
              status = receiveExecutable(j, job, mustRecover); // UIMA-4142, add mustRecover flag
              logger.trace(methodName, j.getId(), "Job arrives, accepted:", status);
              break;
          case Reservation:
              // UIMA-4275. non-jobs restricted to exactly one allocation per request 
              j.setMaxShares(1);

              status = receiveReservation(j, job, mustRecover);  // UIMA-4142, add mustRecover flag
              logger.trace(methodName, j.getId(), "Reservation arrives, accepted:", status);
              break;
          default:
              refuse(j, "Unknown job type: " + job.getDuccType());
              status = false;
              break;
        }
        
//         logger.debug(methodName, j.getId(), "Max_processes:", max_processes);
//         logger.debug(methodName, j.getId(), "Max_machines:", max_machines);

        return status;
    }