public void eventArrives()

in uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/JobManagerConverter.java [897:1022]


    public void eventArrives(IDuccWorkMap jobMap)
    {
    	String methodName = "eventArrives";

        if ( jobMap.size() == 0 ) {
            logger.debug(methodName, null, "No state from Orchestrator");
            return;
        }

        // The init file is read and configured ?
        if ( ! scheduler.isInitialized() ) return;   // handle race, OR pub comes in while RM is (re)configuring itself

        if ( scheduler.mustRecover() ) {             // UIMA-4142 reconfig happened. 
            // must do this independently of isInitialized() since reinit could happen fully between OR pubs
            localMap = new DuccWorkMap();            // as if RM had been booted
            lastJobManagerUpdate = new JobManagerUpdate();
            blacklistedResources.clear();            // UIMA-4142
            refusedJobs.clear();
            first_or_state = true;
        }

        if ( first_or_state ) {
            first_or_state = false;
            scheduler.setRecovery(false);

            if ( ! recoverFromOrchestrator(jobMap) ) {
                logger.info(methodName, null, "There are no active jobs in map so can't build up state. Waiting for init stability.");
                return;
            } 
            
            if ( recovery ) {
                logger.info(methodName, null, "Fast recovery is enabled: Recovered state from Orchestrator, starting scheduler.");
                scheduler.start();
            }
        }

        // scheduler is readied either by fast-recovery, or by init stability
        if ( !scheduler.ready() ) {
            logger.info(methodName, null, "Orchestrator event is discarded: scheduler is waiting for init stability or is paused for reconfig..");
            return;
        }

        @SuppressWarnings("unchecked")
		DuccMapDifference<DuccId, IDuccWork> diffmap = DuccCollectionUtils.difference(jobMap, localMap);        

        for ( Object o : jobMap.values() ) {
        	IDuccWork w = (IDuccWork) o;
            logger.trace(methodName, w.getDuccId(), "Arrives in JmStateEvent state =", w.getStateObject());
        }

        //
        // First handle new stuff
        //
        Map<DuccId, IDuccWork> jobs = diffmap.getLeft();
        for ( IDuccWork w : jobs.values() ) {

            if ( w.isSchedulable() ) {
                logger.info(methodName, w.getDuccId(), "Incoming, state = ", w.getStateObject());
                try {
                    if ( jobArrives(w) ) {                // if not ... something is fubar and we have to ignore it for now
                        localMap.addDuccWork(w);
                    } 
                } catch ( Exception e ) {
                    logger.error(methodName, w.getDuccId(), "Can't receive job because of exception", e);
                }
            } else {
                logger.info(methodName, w.getDuccId(), "Received non-schedulable job, state = ", w.getStateObject());
                whitelist(w);                          // UIMA-4142 if blacklisted, clear everything
            }
        }
        
        jobs = diffmap.getRight();
        for ( IDuccWork w :jobs.values() ) {
            logger.info(methodName, w.getDuccId(), "Gone");
            jobRemoved(w.getDuccId());
        }

        //
        // Stuff on the left is incoming.  Stuff on the right is already in my map.
        //
        for( DuccMapValueDifference<IDuccWork> jd: diffmap ) {
            IDuccWork r = jd.getRight();
            IDuccWork l = jd.getLeft();

            if ( ! l.isSchedulable() ) {
                logger.info(methodName, l.getDuccId(), "Removing unschedulable:", r.getStateObject(), "->", l.getStateObject());
                jobRemoved(r.getDuccId());
            } else {

                localMap.addDuccWork(l);           // still schedulable, and we already know about it, just sync the state

                scheduler.signalState(l.getDuccId(), l.getStateObject().toString());
                switch ( l.getDuccType() ) {
                  case Job:    
                      jobUpdate(r.getStateObject(), l);
                      reconcileProcesses(l.getDuccId(), l, r);
                      break;
                  case Service:
                  case Pop:
                      // This is really an AP and OR sets the state to running immediately although it isn't yet, so the
                      // information is incomplete.  We always have to reconcile.
                      if  ( ((IDuccWorkService)l).getServiceDeploymentType() == ServiceDeploymentType.other )  {
                          logger.info(methodName, l.getDuccId(), "[P] State: ", r.getStateObject(), "->", l.getStateObject());
                          reconcileProcesses(l.getDuccId(), l, r);
                      } else  if ( r.getStateObject() != l.getStateObject() ) {
                          // Service state does come int correctly
                          logger.info(methodName, l.getDuccId(), "[S] State: ", r.getStateObject(), "->", l.getStateObject());
                          reconcileProcesses(l.getDuccId(), l, r);
                      }
                      break;
                  case Reservation:
                      if ( r.getStateObject() != l.getStateObject() ) {
                          logger.info(methodName, l.getDuccId(), "[R] State: ", r.getStateObject(), "->", l.getStateObject());
                      }
                      // for the moment, these guys have nothing to reconcile.
                      break;
                  case Undefined:
                      throw new SchedulingException(l.getDuccId(), "Work arrives as type Undefined - should have been filtered out by now.");                      
                }
            }
           
        }

        logger.trace(methodName, null, "Done with JmStateDuccEvent with some jobs processed");

    }