in uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/JobManagerConverter.java [897:1022]
public void eventArrives(IDuccWorkMap jobMap)
{
String methodName = "eventArrives";
if ( jobMap.size() == 0 ) {
logger.debug(methodName, null, "No state from Orchestrator");
return;
}
// The init file is read and configured ?
if ( ! scheduler.isInitialized() ) return; // handle race, OR pub comes in while RM is (re)configuring itself
if ( scheduler.mustRecover() ) { // UIMA-4142 reconfig happened.
// must do this independently of isInitialized() since reinit could happen fully between OR pubs
localMap = new DuccWorkMap(); // as if RM had been booted
lastJobManagerUpdate = new JobManagerUpdate();
blacklistedResources.clear(); // UIMA-4142
refusedJobs.clear();
first_or_state = true;
}
if ( first_or_state ) {
first_or_state = false;
scheduler.setRecovery(false);
if ( ! recoverFromOrchestrator(jobMap) ) {
logger.info(methodName, null, "There are no active jobs in map so can't build up state. Waiting for init stability.");
return;
}
if ( recovery ) {
logger.info(methodName, null, "Fast recovery is enabled: Recovered state from Orchestrator, starting scheduler.");
scheduler.start();
}
}
// scheduler is readied either by fast-recovery, or by init stability
if ( !scheduler.ready() ) {
logger.info(methodName, null, "Orchestrator event is discarded: scheduler is waiting for init stability or is paused for reconfig..");
return;
}
@SuppressWarnings("unchecked")
DuccMapDifference<DuccId, IDuccWork> diffmap = DuccCollectionUtils.difference(jobMap, localMap);
for ( Object o : jobMap.values() ) {
IDuccWork w = (IDuccWork) o;
logger.trace(methodName, w.getDuccId(), "Arrives in JmStateEvent state =", w.getStateObject());
}
//
// First handle new stuff
//
Map<DuccId, IDuccWork> jobs = diffmap.getLeft();
for ( IDuccWork w : jobs.values() ) {
if ( w.isSchedulable() ) {
logger.info(methodName, w.getDuccId(), "Incoming, state = ", w.getStateObject());
try {
if ( jobArrives(w) ) { // if not ... something is fubar and we have to ignore it for now
localMap.addDuccWork(w);
}
} catch ( Exception e ) {
logger.error(methodName, w.getDuccId(), "Can't receive job because of exception", e);
}
} else {
logger.info(methodName, w.getDuccId(), "Received non-schedulable job, state = ", w.getStateObject());
whitelist(w); // UIMA-4142 if blacklisted, clear everything
}
}
jobs = diffmap.getRight();
for ( IDuccWork w :jobs.values() ) {
logger.info(methodName, w.getDuccId(), "Gone");
jobRemoved(w.getDuccId());
}
//
// Stuff on the left is incoming. Stuff on the right is already in my map.
//
for( DuccMapValueDifference<IDuccWork> jd: diffmap ) {
IDuccWork r = jd.getRight();
IDuccWork l = jd.getLeft();
if ( ! l.isSchedulable() ) {
logger.info(methodName, l.getDuccId(), "Removing unschedulable:", r.getStateObject(), "->", l.getStateObject());
jobRemoved(r.getDuccId());
} else {
localMap.addDuccWork(l); // still schedulable, and we already know about it, just sync the state
scheduler.signalState(l.getDuccId(), l.getStateObject().toString());
switch ( l.getDuccType() ) {
case Job:
jobUpdate(r.getStateObject(), l);
reconcileProcesses(l.getDuccId(), l, r);
break;
case Service:
case Pop:
// This is really an AP and OR sets the state to running immediately although it isn't yet, so the
// information is incomplete. We always have to reconcile.
if ( ((IDuccWorkService)l).getServiceDeploymentType() == ServiceDeploymentType.other ) {
logger.info(methodName, l.getDuccId(), "[P] State: ", r.getStateObject(), "->", l.getStateObject());
reconcileProcesses(l.getDuccId(), l, r);
} else if ( r.getStateObject() != l.getStateObject() ) {
// Service state does come int correctly
logger.info(methodName, l.getDuccId(), "[S] State: ", r.getStateObject(), "->", l.getStateObject());
reconcileProcesses(l.getDuccId(), l, r);
}
break;
case Reservation:
if ( r.getStateObject() != l.getStateObject() ) {
logger.info(methodName, l.getDuccId(), "[R] State: ", r.getStateObject(), "->", l.getStateObject());
}
// for the moment, these guys have nothing to reconcile.
break;
case Undefined:
throw new SchedulingException(l.getDuccId(), "Work arrives as type Undefined - should have been filtered out by now.");
}
}
}
logger.trace(methodName, null, "Done with JmStateDuccEvent with some jobs processed");
}