in uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/StateManager.java [1456:1540]
public void reconcileState(ServiceMap serviceMap) {
String methodName = "reconcileState (SM)";
logger.trace(methodName, null, messages.fetch("enter"));
DuccWorkMap workMap = orchestratorCommonArea.getWorkMap();
int changes = 0;
Iterator<DuccId> serviceMapIterator = serviceMap.keySet().iterator();
TrackSync ts = TrackSync.await(workMap, this.getClass(), methodName);
synchronized(workMap) {
ts.using();
while(serviceMapIterator.hasNext()) {
DuccId duccId = serviceMapIterator.next();
ServiceDependency services = serviceMap.get(duccId);
DuccWorkJob duccWorkJob = (DuccWorkJob) WorkMapHelper.findDuccWork(workMap, duccId, this, methodName);
if(duccWorkJob != null) {
JobState jobState = duccWorkJob.getJobState();
ServiceState serviceState = services.getState();
switch(jobState) {
case Received:
logger.warn(methodName, duccId, messages.fetchLabel("unexpected job state")+jobState);
break;
case WaitingForDriver:
logger.debug(methodName, duccId, messages.fetchLabel("pending job state")+jobState);
break;
case WaitingForServices:
switch(serviceState) {
case Pending: // UIMA-4223
case Waiting:
case Starting:
case Initializing:
break;
case Available:
stateJobAccounting.stateChange(duccWorkJob, JobState.WaitingForResources);
changes++;
logger.info(methodName, duccId, messages.fetchLabel("job state")+jobState+" "+messages.fetchLabel("services state")+serviceState);
break;
case NotAvailable:
case Stopped:
case Stopping:
stateJobAccounting.stateChange(duccWorkJob, JobState.Completing);
duccWorkJob.getStandardInfo().setDateOfCompletion(TimeStamp.getCurrentMillis());
String sdm = getServiceDependencyMessages(services);
IRationale rationale = new Rationale();
if(sdm != null) {
rationale = new Rationale("service manager reported "+sdm);
}
stateJobAccounting.complete(duccWorkJob, JobCompletionType.ServicesUnavailable, rationale);
changes++;
logger.info(methodName, duccId, messages.fetchLabel("job state")+jobState+" "+messages.fetchLabel("services state")+serviceState);
break;
case Undefined:
logger.warn(methodName, duccId, messages.fetchLabel("job state")+jobState+" "+messages.fetchLabel("services state")+serviceState);
break;
default:
break;
}
break;
case WaitingForResources:
logger.debug(methodName, duccId, messages.fetchLabel("job state")+jobState+" "+messages.fetchLabel("services state")+serviceState);
break;
case Assigned:
case Initializing:
case Running:
logger.debug(methodName, duccId, messages.fetchLabel("job state")+jobState+" "+messages.fetchLabel("services state")+serviceState);
break;
case Completed:
logger.debug(methodName, duccId, messages.fetchLabel("job state")+jobState+" "+messages.fetchLabel("services state")+serviceState);
break;
case Undefined:
logger.warn(methodName, duccId, messages.fetchLabel("unexpected job state")+jobState);
break;
default:
break;
}
}
else {
logger.debug(methodName, duccId, messages.fetch("job not found"));
}
}
if(changes > 0) {
OrchestratorCheckpoint.getInstance().saveState();
}
}
ts.ended();
logger.trace(methodName, null, messages.fetch("exit"));
}