public void reconcileState()

in uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/StateManager.java [978:1214]


	public void reconcileState(Map<DuccId, IRmJobState> rmResourceStateMap) throws Exception {
		String methodName = "reconcileState (RM)";
		logger.trace(methodName, null, messages.fetch("enter"));
		logger.debug(methodName, null, messages.fetchLabel("size")+rmResourceStateMap.size());
		DuccWorkMap workMap = orchestratorCommonArea.getWorkMap();
		int changes = 0;
		TrackSync ts = TrackSync.await(workMap, this.getClass(), methodName);
		synchronized(workMap) {
			ts.using();
			Iterator<DuccId> rmResourceStateIterator = rmResourceStateMap.keySet().iterator();
			while(rmResourceStateIterator.hasNext()) {
				DuccId duccId = rmResourceStateIterator.next();
				IRmJobState rmResourceState = rmResourceStateMap.get(duccId);
				Map<DuccId, IResource> mapAdditions = rmResourceState.getPendingAdditions();
				if(mapAdditions != null) {
					int mapSize = mapAdditions.size();
					if(mapSize > 0) {
						logger.info(methodName, duccId, messages.fetchLabel("pending additions")+mapSize);
					}
					else {
						logger.trace(methodName, duccId, messages.fetchLabel("pending additions")+mapSize);
					}

				}
				Map<DuccId, IResource> mapRemovals = rmResourceState.getPendingRemovals();
				if(mapRemovals != null) {
					int mapSize = mapRemovals.size();
					if(mapSize > 0) {
						logger.info(methodName, duccId, messages.fetchLabel("pending removals")+mapSize);
					}
					else {
						logger.trace(methodName, duccId, messages.fetchLabel("pending removals")+mapSize);
					}
				}
				IDuccWork duccWork = WorkMapHelper.findDuccWork(workMap, duccId, this, methodName);
				if(duccWork== null) {
					logger.debug(methodName, duccId, messages.fetch("not found"));
				}
				else {
					logger.trace(methodName, duccId, messages.fetchLabel("type")+duccWork.getDuccType());
					duccWork.setRmReason(null);
					switch(duccWork.getDuccType()) {
					case Job:
						logger.trace(methodName, duccId, messages.fetch("processing job..."));
						DuccWorkJob duccWorkJob = (DuccWorkJob) duccWork;
						processPurger(duccWorkJob,rmResourceState.getResources());
						changes += processMapResourcesAdd(duccWorkJob,rmResourceState.memoryGbPerProcess(),rmResourceState.getPendingAdditions());
						changes += processMapResourcesDel(duccWorkJob,rmResourceState.memoryGbPerProcess(),rmResourceState.getPendingRemovals());
						JobState jobState = duccWorkJob.getJobState();
						logger.trace(methodName, duccId, messages.fetchLabel("job state")+jobState);
						switch(jobState) {
						case Received:
						case WaitingForDriver:
							logger.warn(methodName, duccId, messages.fetchLabel("unexpected state")+jobState);
							break;
						case WaitingForServices:
							logger.debug(methodName, duccId, messages.fetchLabel("unexpected state")+jobState);
							break;
						case WaitingForResources:
							String rmReason = rmResourceState.getReason();
							logger.trace(methodName, duccId, messages.fetchLabel("rmReason")+rmReason);
							duccWork.setRmReason(rmReason);
							if(rmResourceState.isRefused()) {
								duccWorkJob.getStandardInfo().setDateOfCompletion(TimeStamp.getCurrentMillis());
								duccWorkJob.setCompletionType(JobCompletionType.ResourcesUnavailable);
								duccWorkJob.setCompletionRationale(new Rationale("resource manager refused allocation: "+rmResourceState.getReason()));
								changes += stateChange(duccWorkJob,JobState.Completed);
								logger.warn(methodName, duccId, messages.fetchLabel("refused")+rmResourceState.getReason());
								String text = rmResourceState.getReason();
								UserLogging.record(duccWorkJob, text);
							}
							if(duccWorkJob.getProcessMap().size() > 0) {
								changes += stateChange(duccWorkJob,JobState.Assigned);
								logger.info(methodName, duccId, messages.fetchLabel("resources count")+duccWorkJob.getProcessMap().size());
							}
							break;
						case Assigned:
						case Initializing:
						case Running:
							if(duccWorkJob.getProcessMap().size() == 0) {
								changes += stateChange(duccWorkJob,JobState.WaitingForResources);
								logger.info(methodName, duccId, messages.fetchLabel("resources count")+duccWorkJob.getProcessMap().size());
							}
							break;
						case Completing:
						case Completed:
							logger.debug(methodName, duccId, messages.fetchLabel("unsuitable state")+jobState);
							break;
						case Undefined:
							logger.warn(methodName, duccId, messages.fetchLabel("unsuitable state")+jobState);
							break;
						}
						break;
					case Reservation:
						logger.trace(methodName, duccId, messages.fetch("processing reservation..."));
						DuccWorkReservation duccWorkReservation = (DuccWorkReservation) duccWork;
						changes += reservationMapResourcesAdd(duccWorkReservation,rmResourceState.memoryGbPerProcess(),rmResourceState.getPendingAdditions());
						changes += reservationMapResourcesDel(duccWorkReservation,rmResourceState.memoryGbPerProcess(),rmResourceState.getPendingRemovals());
						ReservationState reservationState = duccWorkReservation.getReservationState();
						logger.trace(methodName, duccId, messages.fetchLabel("reservation state")+reservationState);
						switch(reservationState) {
						case Received:
							logger.warn(methodName, duccId, messages.fetchLabel("unexpected state")+reservationState);
							break;
						case WaitingForResources:
							String rmReason = rmResourceState.getReason();
							logger.trace(methodName, duccId, messages.fetchLabel("rmReason")+rmReason);
							duccWork.setRmReason(rmReason);
							if(rmResourceState.isRefused()) {
								String schedulingClass = duccWorkReservation.getSchedulingInfo().getSchedulingClass().trim();
								if(schedulingClass.equals(DuccSchedulerClasses.JobDriver)) {
									if(!refusedLogged.get()) {
										logger.warn(methodName, duccId, messages.fetchLabel("refusal ignored")+rmResourceState.getReason());
										refusedLogged.set(true);
									}
								}
								else {
									duccWorkReservation.getStandardInfo().setDateOfCompletion(TimeStamp.getCurrentMillis());
									duccWorkReservation.setCompletionType(ReservationCompletionType.ResourcesUnavailable);
									duccWorkReservation.setCompletionRationale(new Rationale("resource manager refused allocation: "+rmResourceState.getReason()));
									changes += stateChange(duccWorkReservation,ReservationState.Completed);
									logger.warn(methodName, duccId, messages.fetchLabel("refused")+rmResourceState.getReason());
								}
							}
							else {
								if(rmResourceState.getResources() != null) {
									if(!rmResourceState.getResources().isEmpty()) {
										changes += stateChange(duccWorkReservation,ReservationState.Assigned);
										logger.info(methodName, duccId, messages.fetchLabel("resources count")+rmResourceState.getResources().size());
									}
								}
								else {
									logger.info(methodName, duccId, messages.fetch("waiting...no resources?"));
								}
							}
							break;
						case Assigned:
							if(rmResourceState.getResources() != null) {
								if(rmResourceState.getResources().isEmpty()) {
									changes += stateChange(duccWorkReservation,ReservationState.Completed);
									logger.info(methodName, duccId, messages.fetchLabel("resources count")+rmResourceState.getResources().size());
								}
							}
							else {
								logger.info(methodName, duccId, messages.fetch("assigned...no resources?"));
							}
							break;
						case Completed:
							logger.debug(methodName, duccId, messages.fetchLabel("unsuitable state")+reservationState);
							break;
						case Undefined:
							logger.warn(methodName, duccId, messages.fetchLabel("unsuitable state")+reservationState);
							break;
						}
						break;
					case Service:
						logger.trace(methodName, duccId, messages.fetch("processing service..."));
						DuccWorkJob duccWorkService = (DuccWorkJob) duccWork;
						int processPurged = processPurger(duccWorkService,rmResourceState.getResources());
						changes += processMapResourcesAdd(duccWorkService,rmResourceState.memoryGbPerProcess(),rmResourceState.getPendingAdditions());
						changes += processMapResourcesDel(duccWorkService,rmResourceState.memoryGbPerProcess(),rmResourceState.getPendingRemovals());
						JobState serviceState = duccWorkService.getJobState();
						logger.trace(methodName, duccId, messages.fetchLabel("service state")+serviceState);
						switch(serviceState) {
						case Received:
							logger.warn(methodName, duccId, messages.fetchLabel("unexpected state")+serviceState);
							break;
						case WaitingForServices:
							logger.debug(methodName, duccId, messages.fetchLabel("unexpected state")+serviceState);
							break;
						case WaitingForResources:
							String rmReason = rmResourceState.getReason();
							logger.trace(methodName, duccId, messages.fetchLabel("rmReason")+rmReason);
							duccWork.setRmReason(rmReason);
							if(rmResourceState.isRefused()) {
								duccWorkService.getStandardInfo().setDateOfCompletion(TimeStamp.getCurrentMillis());
								duccWorkService.setCompletionType(JobCompletionType.ResourcesUnavailable);
								duccWorkService.setCompletionRationale(new Rationale("resource manager refused allocation: "+rmResourceState.getReason()));
								changes += stateChange(duccWorkService,JobState.Completed);
								logger.warn(methodName, duccId, messages.fetchLabel("refused")+rmResourceState.getReason());
								String text = rmResourceState.getReason();
								UserLogging.record(duccWorkService, text);
							}
							if(duccWorkService.getProcessMap().size() > 0) {
								changes += stateChange(duccWorkService,JobState.Initializing);
								logger.info(methodName, duccId, messages.fetchLabel("resources count")+duccWorkService.getProcessMap().size());
							}
							if((processPurged > 0) && allProcessesTerminated(duccWorkService)) {
								duccWorkService.getStandardInfo().setDateOfCompletion(TimeStamp.getCurrentMillis());
								duccWorkService.setCompletionType(JobCompletionType.ResourcesUnavailable);
								duccWorkService.setCompletionRationale(new Rationale("resource manager purged allocation: "+rmResourceState.getReason()));
								changes += stateChange(duccWorkService,JobState.Completed);
								logger.warn(methodName, duccId, messages.fetchLabel("purged")+rmResourceState.getReason());
								String text = rmResourceState.getReason();
								UserLogging.record(duccWorkService, text);
							}
							break;
						case Assigned:
						case Initializing:
						case Running:
							if(duccWorkService.getProcessMap().size() == 0) {
								changes += stateChange(duccWorkService,JobState.WaitingForResources);
								logger.info(methodName, duccId, messages.fetchLabel("resources count")+duccWorkService.getProcessMap().size());
							}
							if((processPurged > 0) && allProcessesTerminated(duccWorkService)) {
								duccWorkService.getStandardInfo().setDateOfCompletion(TimeStamp.getCurrentMillis());
								duccWorkService.setCompletionType(JobCompletionType.ResourcesUnavailable);
								duccWorkService.setCompletionRationale(new Rationale("resource manager purged allocation: "+rmResourceState.getReason()));
								changes += stateChange(duccWorkService,JobState.Completed);
								logger.warn(methodName, duccId, messages.fetchLabel("purged")+rmResourceState.getReason());
								String text = rmResourceState.getReason();
								UserLogging.record(duccWorkService, text);
							}
							break;
						case Completing:
						case Completed:
							logger.debug(methodName, duccId, messages.fetchLabel("unsuitable state")+serviceState);
							break;
						case Undefined:
							logger.warn(methodName, duccId, messages.fetchLabel("unsuitable state")+serviceState);
							break;
						default:
							break;
						}
						break;
					default:
						break;
					}
				}
			}
			if(changes > 0) {
				OrchestratorCheckpoint.getInstance().saveState();
			}
		}
		ts.ended();
		logger.trace(methodName, null, messages.fetch("exit"));
	}