in uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/StateManager.java [289:390]
public int prune(DuccWorkMap workMap) {
String methodName = "prune";
dumper();
int changes = 0;
logger.trace(methodName, null, messages.fetch("enter"));
long t0 = System.currentTimeMillis();
Set<DuccId> workMapSet = workMap.keySet();
logger.debug(methodName, jobid, "size="+workMapSet.size());
Iterator<DuccId> workMapIterator = workMapSet.iterator();
while(workMapIterator.hasNext()) {
DuccId duccId = workMapIterator.next();
IDuccWork duccWork = WorkMapHelper.findDuccWork(workMap, duccId, this, methodName);
if(duccWork == null) {
logger.warn(methodName, duccId, "not found");
continue;
}
DuccType duccType = duccWork.getDuccType();
if(duccType == null) {
logger.warn(methodName, duccId, "no type?");
continue;
}
switch(duccType) {
case Job:
case Service:
DuccWorkJob duccWorkJob = (DuccWorkJob)duccWork;
if(duccWorkJob != null) {
switch(duccType) {
case Job:
if(jobDriverTerminated(duccWorkJob)) {
OrchestratorHelper.jdDeallocate(duccWorkJob);
changes ++;
}
break;
default:
break;
}
if(isCompleting(duccWorkJob) && allProcessesTerminated(duccWorkJob)) {
stateJobAccounting.stateChange(duccWorkJob, JobState.Completed);
changes ++;
}
if(isCompleted(duccWorkJob) && allProcessesTerminated(duccWorkJob) && isSaved(duccWorkJob) && isAgedOut(duccWorkJob)) {
WorkMapHelper.removeDuccWork(workMap, duccWorkJob, this, methodName);
logger.info(methodName, duccId, messages.fetch("removed job"));
changes ++;
IDuccProcessMap processMap = null;
DuccWorkPopDriver driver = duccWorkJob.getDriver();
if(driver != null) {
processMap = driver.getProcessMap();
}
if(processMap != null) {
Iterator<DuccId> processMapIterator = processMap.keySet().iterator();
while(processMapIterator.hasNext()) {
DuccId processDuccId = processMapIterator.next();
orchestratorCommonArea.getProcessAccounting().removeProcess(processDuccId);
logger.info(methodName, duccId, messages.fetch("removed driver process")+" "+processDuccId.toString());
changes ++;
}
logger.info(methodName, duccId, messages.fetch("processes driver inactive"));
}
processMap = duccWorkJob.getProcessMap();
if(processMap != null) {
Iterator<DuccId> processMapIterator = processMap.keySet().iterator();
while(processMapIterator.hasNext()) {
DuccId processDuccId = processMapIterator.next();
orchestratorCommonArea.getProcessAccounting().removeProcess(processDuccId);
logger.info(methodName, duccId, messages.fetch("removed process")+" "+processDuccId.toString());
changes ++;
}
logger.info(methodName, duccId, messages.fetch("processes inactive"));
}
}
else {
logger.debug(methodName, duccId, messages.fetch("processes active"));
}
}
break;
case Reservation:
DuccWorkReservation duccWorkReservation = (DuccWorkReservation)duccWork;
if(duccWorkReservation != null) {
if(duccWorkReservation.isCompleted() && isSaved(duccWorkReservation) && isAgedOut(duccWorkReservation)) {
WorkMapHelper.removeDuccWork(workMap, duccWorkReservation, this, methodName);
logger.info(methodName, duccId, messages.fetch("removed reservation"));
changes ++;
}
}
break;
default:
break;
}
}
long t1 = System.currentTimeMillis();
long elapsed = t1 - t0;
if(elapsed > Constants.SYNC_LIMIT) {
logger.debug(methodName, null, "elapsed msecs: "+elapsed);
}
logger.debug(methodName, null, "processToWorkMap.size="+orchestratorCommonArea.getProcessAccounting().processCount());
if(changes > 0) {
OrchestratorCheckpoint.getInstance().saveState();
}
logger.trace(methodName, null, messages.fetch("exit"));
return changes;
}