public void reconcileState()

in uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/StateManager.java [602:775]


	public void reconcileState(IDriverStatusReport jdStatusReport) {
		String methodName = "reconcileState (JD)";
		logger.trace(methodName, null, messages.fetch("enter"));
		DuccWorkMap workMap = orchestratorCommonArea.getWorkMap();
		int changes = 0;
		TrackSync ts = TrackSync.await(workMap, this.getClass(), methodName);
		synchronized(workMap) {
			ts.using();
			DuccId duccId = jdStatusReport.getDuccId();
			String sid = ""+duccId.getFriendly();
			DuccWorkJob duccWorkJob = (DuccWorkJob) WorkMapHelper.findDuccWork(workMap, sid, this, methodName);
			if(duccWorkJob != null) {
				//
				String jdJmxUrl = jdStatusReport.getJdJmxUrl();
				setJdJmxUrl(duccWorkJob, jdJmxUrl);
				//
				copyInvestmentReport(duccWorkJob, jdStatusReport);
				copyProcessWorkItemsReport(duccWorkJob, jdStatusReport);
				copyDriverWorkItemsReport(duccWorkJob, jdStatusReport);
				//
				logger.debug(methodName, duccId, duccWorkJob.getJobState(), "total="+jdStatusReport.getWorkItemsTotal(), "completed="+jdStatusReport.getWorkItemsProcessingCompleted(), "error="+jdStatusReport.getWorkItemsProcessingError());
				switch(duccWorkJob.getJobState()) {
				case Completed:
					break;
				case Completing:
				default:
					duccWorkJob.setWiTotal(jdStatusReport.getWorkItemsTotal());
					duccWorkJob.setWiDone(jdStatusReport.getWorkItemsProcessingCompleted());
					duccWorkJob.setWiError(jdStatusReport.getWorkItemsProcessingError());
					break;
				}
				//
				IRationale rationale;
				if(jdStatusReport.getWorkItemsTotal() == 0) {
					jobTerminate(duccWorkJob, JobCompletionType.NoWorkItemsFound, new Rationale("job driver had no work items to process"), ProcessDeallocationType.JobCanceled);
				}
				else if(jdStatusReport.isKillJob()) {
					rationale = jdStatusReport.getJobCompletionRationale();
					jobTerminate(duccWorkJob, JobCompletionType.CanceledByDriver, rationale, ProcessDeallocationType.JobFailure);
				}
				else {
					switch(jdStatusReport.getDriverState()) {
					case Failed:
						rationale = jdStatusReport.getJobCompletionRationale();
						jobTerminate(duccWorkJob, JobCompletionType.CanceledByDriver, rationale, ProcessDeallocationType.JobFailure);
						break;
					case NotRunning:
						break;
					case Initializing:
						switch(duccWorkJob.getJobState()) {
						case WaitingForDriver:
						    addJdUrlToJpCommandLine(duccWorkJob, jdStatusReport);
						    addDeployableToJpCommandLine(duccWorkJob, jdStatusReport);
						    if(!duccWorkJob.isJdURLSpecified()) {
						        logger.debug(methodName, duccId, "No JdURL provided yet - still waitingForDriver");
						        break;
	                        }
							JobState nextState = JobState.WaitingForServices;
							if(duccWorkJob.getServiceDependencies() == null) {
								String message = messages.fetch("bypass")+" "+nextState;
								logger.debug(methodName, duccId, message);
								nextState = JobState.WaitingForResources;
							}
							stateJobAccounting.stateChange(duccWorkJob, nextState);
							break;
						case Assigned:
							stateJobAccounting.stateChange(duccWorkJob, JobState.Initializing);
							break;
						case Initializing:
							break;
						default:
							break;
						}
						break;
					case Running:
					case Idle:
						if(jdStatusReport.isKillJob()) {
							rationale = jdStatusReport.getJobCompletionRationale();
							switch(duccWorkJob.getJobState()) {
							case WaitingForServices:
								if(rationale == null) {
									rationale = new Rationale("waiting for services");
								}
								else {
									if(rationale.isSpecified()) {
										String text = rationale.getText();
										rationale = new Rationale(text+": "+"waiting for services");
									}
									else {
										rationale = new Rationale("waiting for services");
									}
								}
								break;
							default:
								break;
							}
							jobTerminate(duccWorkJob, JobCompletionType.CanceledByDriver, rationale, ProcessDeallocationType.JobFailure);
							break;
						}
						switch(duccWorkJob.getJobState()) {
						case WaitingForDriver:
							stateJobAccounting.stateChange(duccWorkJob, JobState.WaitingForServices);
							break;
						case Assigned:
						case Initializing:
							stateJobAccounting.stateChange(duccWorkJob, JobState.Running);
							break;
						default:
							break;
						}
						break;
					case Completing:
						if(!duccWorkJob.isFinished()) {
							stateJobAccounting.stateChange(duccWorkJob, JobState.Completing);
						}
						stopJps(duccWorkJob);
						stopJd(duccWorkJob);
						break;
					case Completed:
						if(!duccWorkJob.isCompleted()) {
							if(!duccWorkJob.isFinished()) {
								stateJobAccounting.stateChange(duccWorkJob, JobState.Completing);
							}
							stopJps(duccWorkJob);
							stopJd(duccWorkJob);
							duccWorkJob.getStandardInfo().setDateOfCompletion(TimeStamp.getCurrentMillis());
							switch(jdStatusReport.getJobCompletionType()) {
							case EndOfJob:
								try {
									int errors = jdStatusReport.getWorkItemsProcessingError();
									int done = jdStatusReport.getWorkItemsProcessingCompleted();
									if(errors > 0) {
										setCompletionIfNotAlreadySet(duccWorkJob, JobCompletionType.Error, new Rationale("state manager detected error work items="+errors));
									}
									else if(done == 0) {
										setCompletionIfNotAlreadySet(duccWorkJob, JobCompletionType.EndOfJob, new Rationale("state manager detected no work items processed"));
									}
									else {
										setCompletionIfNotAlreadySet(duccWorkJob, JobCompletionType.EndOfJob, new Rationale("state manager detected normal completion"));
									}
								}
								catch(Exception e) {
									logger.error(methodName, duccId, e);
								}
								break;
							default:
								setCompletionIfNotAlreadySet(duccWorkJob, jdStatusReport);
								break;
							}
						}
						break;
					case Undefined:
						break;
					}
				}
				//
				OrchestratorCommonArea.getInstance().getProcessAccounting().setStatus(jdStatusReport,duccWorkJob);
				if(deallocateIdleProcesses(duccWorkJob, jdStatusReport)) {
					changes++;
				}
				if(deallocateFailedProcesses(duccWorkJob, jdStatusReport)) {
					changes++;
				}
			}
			else {
				logger.warn(methodName, duccId, messages.fetch("not found"));
			}
		}
		ts.ended();
		if(changes > 0) {
			OrchestratorCheckpoint.getInstance().saveState();
		}
		logger.trace(methodName, null, messages.fetch("exit"));
	}