in uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/StateManager.java [978:1214]
public void reconcileState(Map<DuccId, IRmJobState> rmResourceStateMap) throws Exception {
String methodName = "reconcileState (RM)";
logger.trace(methodName, null, messages.fetch("enter"));
logger.debug(methodName, null, messages.fetchLabel("size")+rmResourceStateMap.size());
DuccWorkMap workMap = orchestratorCommonArea.getWorkMap();
int changes = 0;
TrackSync ts = TrackSync.await(workMap, this.getClass(), methodName);
synchronized(workMap) {
ts.using();
Iterator<DuccId> rmResourceStateIterator = rmResourceStateMap.keySet().iterator();
while(rmResourceStateIterator.hasNext()) {
DuccId duccId = rmResourceStateIterator.next();
IRmJobState rmResourceState = rmResourceStateMap.get(duccId);
Map<DuccId, IResource> mapAdditions = rmResourceState.getPendingAdditions();
if(mapAdditions != null) {
int mapSize = mapAdditions.size();
if(mapSize > 0) {
logger.info(methodName, duccId, messages.fetchLabel("pending additions")+mapSize);
}
else {
logger.trace(methodName, duccId, messages.fetchLabel("pending additions")+mapSize);
}
}
Map<DuccId, IResource> mapRemovals = rmResourceState.getPendingRemovals();
if(mapRemovals != null) {
int mapSize = mapRemovals.size();
if(mapSize > 0) {
logger.info(methodName, duccId, messages.fetchLabel("pending removals")+mapSize);
}
else {
logger.trace(methodName, duccId, messages.fetchLabel("pending removals")+mapSize);
}
}
IDuccWork duccWork = WorkMapHelper.findDuccWork(workMap, duccId, this, methodName);
if(duccWork== null) {
logger.debug(methodName, duccId, messages.fetch("not found"));
}
else {
logger.trace(methodName, duccId, messages.fetchLabel("type")+duccWork.getDuccType());
duccWork.setRmReason(null);
switch(duccWork.getDuccType()) {
case Job:
logger.trace(methodName, duccId, messages.fetch("processing job..."));
DuccWorkJob duccWorkJob = (DuccWorkJob) duccWork;
processPurger(duccWorkJob,rmResourceState.getResources());
changes += processMapResourcesAdd(duccWorkJob,rmResourceState.memoryGbPerProcess(),rmResourceState.getPendingAdditions());
changes += processMapResourcesDel(duccWorkJob,rmResourceState.memoryGbPerProcess(),rmResourceState.getPendingRemovals());
JobState jobState = duccWorkJob.getJobState();
logger.trace(methodName, duccId, messages.fetchLabel("job state")+jobState);
switch(jobState) {
case Received:
case WaitingForDriver:
logger.warn(methodName, duccId, messages.fetchLabel("unexpected state")+jobState);
break;
case WaitingForServices:
logger.debug(methodName, duccId, messages.fetchLabel("unexpected state")+jobState);
break;
case WaitingForResources:
String rmReason = rmResourceState.getReason();
logger.trace(methodName, duccId, messages.fetchLabel("rmReason")+rmReason);
duccWork.setRmReason(rmReason);
if(rmResourceState.isRefused()) {
duccWorkJob.getStandardInfo().setDateOfCompletion(TimeStamp.getCurrentMillis());
duccWorkJob.setCompletionType(JobCompletionType.ResourcesUnavailable);
duccWorkJob.setCompletionRationale(new Rationale("resource manager refused allocation: "+rmResourceState.getReason()));
changes += stateChange(duccWorkJob,JobState.Completed);
logger.warn(methodName, duccId, messages.fetchLabel("refused")+rmResourceState.getReason());
String text = rmResourceState.getReason();
UserLogging.record(duccWorkJob, text);
}
if(duccWorkJob.getProcessMap().size() > 0) {
changes += stateChange(duccWorkJob,JobState.Assigned);
logger.info(methodName, duccId, messages.fetchLabel("resources count")+duccWorkJob.getProcessMap().size());
}
break;
case Assigned:
case Initializing:
case Running:
if(duccWorkJob.getProcessMap().size() == 0) {
changes += stateChange(duccWorkJob,JobState.WaitingForResources);
logger.info(methodName, duccId, messages.fetchLabel("resources count")+duccWorkJob.getProcessMap().size());
}
break;
case Completing:
case Completed:
logger.debug(methodName, duccId, messages.fetchLabel("unsuitable state")+jobState);
break;
case Undefined:
logger.warn(methodName, duccId, messages.fetchLabel("unsuitable state")+jobState);
break;
}
break;
case Reservation:
logger.trace(methodName, duccId, messages.fetch("processing reservation..."));
DuccWorkReservation duccWorkReservation = (DuccWorkReservation) duccWork;
changes += reservationMapResourcesAdd(duccWorkReservation,rmResourceState.memoryGbPerProcess(),rmResourceState.getPendingAdditions());
changes += reservationMapResourcesDel(duccWorkReservation,rmResourceState.memoryGbPerProcess(),rmResourceState.getPendingRemovals());
ReservationState reservationState = duccWorkReservation.getReservationState();
logger.trace(methodName, duccId, messages.fetchLabel("reservation state")+reservationState);
switch(reservationState) {
case Received:
logger.warn(methodName, duccId, messages.fetchLabel("unexpected state")+reservationState);
break;
case WaitingForResources:
String rmReason = rmResourceState.getReason();
logger.trace(methodName, duccId, messages.fetchLabel("rmReason")+rmReason);
duccWork.setRmReason(rmReason);
if(rmResourceState.isRefused()) {
String schedulingClass = duccWorkReservation.getSchedulingInfo().getSchedulingClass().trim();
if(schedulingClass.equals(DuccSchedulerClasses.JobDriver)) {
if(!refusedLogged.get()) {
logger.warn(methodName, duccId, messages.fetchLabel("refusal ignored")+rmResourceState.getReason());
refusedLogged.set(true);
}
}
else {
duccWorkReservation.getStandardInfo().setDateOfCompletion(TimeStamp.getCurrentMillis());
duccWorkReservation.setCompletionType(ReservationCompletionType.ResourcesUnavailable);
duccWorkReservation.setCompletionRationale(new Rationale("resource manager refused allocation: "+rmResourceState.getReason()));
changes += stateChange(duccWorkReservation,ReservationState.Completed);
logger.warn(methodName, duccId, messages.fetchLabel("refused")+rmResourceState.getReason());
}
}
else {
if(rmResourceState.getResources() != null) {
if(!rmResourceState.getResources().isEmpty()) {
changes += stateChange(duccWorkReservation,ReservationState.Assigned);
logger.info(methodName, duccId, messages.fetchLabel("resources count")+rmResourceState.getResources().size());
}
}
else {
logger.info(methodName, duccId, messages.fetch("waiting...no resources?"));
}
}
break;
case Assigned:
if(rmResourceState.getResources() != null) {
if(rmResourceState.getResources().isEmpty()) {
changes += stateChange(duccWorkReservation,ReservationState.Completed);
logger.info(methodName, duccId, messages.fetchLabel("resources count")+rmResourceState.getResources().size());
}
}
else {
logger.info(methodName, duccId, messages.fetch("assigned...no resources?"));
}
break;
case Completed:
logger.debug(methodName, duccId, messages.fetchLabel("unsuitable state")+reservationState);
break;
case Undefined:
logger.warn(methodName, duccId, messages.fetchLabel("unsuitable state")+reservationState);
break;
}
break;
case Service:
logger.trace(methodName, duccId, messages.fetch("processing service..."));
DuccWorkJob duccWorkService = (DuccWorkJob) duccWork;
int processPurged = processPurger(duccWorkService,rmResourceState.getResources());
changes += processMapResourcesAdd(duccWorkService,rmResourceState.memoryGbPerProcess(),rmResourceState.getPendingAdditions());
changes += processMapResourcesDel(duccWorkService,rmResourceState.memoryGbPerProcess(),rmResourceState.getPendingRemovals());
JobState serviceState = duccWorkService.getJobState();
logger.trace(methodName, duccId, messages.fetchLabel("service state")+serviceState);
switch(serviceState) {
case Received:
logger.warn(methodName, duccId, messages.fetchLabel("unexpected state")+serviceState);
break;
case WaitingForServices:
logger.debug(methodName, duccId, messages.fetchLabel("unexpected state")+serviceState);
break;
case WaitingForResources:
String rmReason = rmResourceState.getReason();
logger.trace(methodName, duccId, messages.fetchLabel("rmReason")+rmReason);
duccWork.setRmReason(rmReason);
if(rmResourceState.isRefused()) {
duccWorkService.getStandardInfo().setDateOfCompletion(TimeStamp.getCurrentMillis());
duccWorkService.setCompletionType(JobCompletionType.ResourcesUnavailable);
duccWorkService.setCompletionRationale(new Rationale("resource manager refused allocation: "+rmResourceState.getReason()));
changes += stateChange(duccWorkService,JobState.Completed);
logger.warn(methodName, duccId, messages.fetchLabel("refused")+rmResourceState.getReason());
String text = rmResourceState.getReason();
UserLogging.record(duccWorkService, text);
}
if(duccWorkService.getProcessMap().size() > 0) {
changes += stateChange(duccWorkService,JobState.Initializing);
logger.info(methodName, duccId, messages.fetchLabel("resources count")+duccWorkService.getProcessMap().size());
}
if((processPurged > 0) && allProcessesTerminated(duccWorkService)) {
duccWorkService.getStandardInfo().setDateOfCompletion(TimeStamp.getCurrentMillis());
duccWorkService.setCompletionType(JobCompletionType.ResourcesUnavailable);
duccWorkService.setCompletionRationale(new Rationale("resource manager purged allocation: "+rmResourceState.getReason()));
changes += stateChange(duccWorkService,JobState.Completed);
logger.warn(methodName, duccId, messages.fetchLabel("purged")+rmResourceState.getReason());
String text = rmResourceState.getReason();
UserLogging.record(duccWorkService, text);
}
break;
case Assigned:
case Initializing:
case Running:
if(duccWorkService.getProcessMap().size() == 0) {
changes += stateChange(duccWorkService,JobState.WaitingForResources);
logger.info(methodName, duccId, messages.fetchLabel("resources count")+duccWorkService.getProcessMap().size());
}
if((processPurged > 0) && allProcessesTerminated(duccWorkService)) {
duccWorkService.getStandardInfo().setDateOfCompletion(TimeStamp.getCurrentMillis());
duccWorkService.setCompletionType(JobCompletionType.ResourcesUnavailable);
duccWorkService.setCompletionRationale(new Rationale("resource manager purged allocation: "+rmResourceState.getReason()));
changes += stateChange(duccWorkService,JobState.Completed);
logger.warn(methodName, duccId, messages.fetchLabel("purged")+rmResourceState.getReason());
String text = rmResourceState.getReason();
UserLogging.record(duccWorkService, text);
}
break;
case Completing:
case Completed:
logger.debug(methodName, duccId, messages.fetchLabel("unsuitable state")+serviceState);
break;
case Undefined:
logger.warn(methodName, duccId, messages.fetchLabel("unsuitable state")+serviceState);
break;
default:
break;
}
break;
default:
break;
}
}
}
if(changes > 0) {
OrchestratorCheckpoint.getInstance().saveState();
}
}
ts.ended();
logger.trace(methodName, null, messages.fetch("exit"));
}