in uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/StateManager.java [602:775]
public void reconcileState(IDriverStatusReport jdStatusReport) {
String methodName = "reconcileState (JD)";
logger.trace(methodName, null, messages.fetch("enter"));
DuccWorkMap workMap = orchestratorCommonArea.getWorkMap();
int changes = 0;
TrackSync ts = TrackSync.await(workMap, this.getClass(), methodName);
synchronized(workMap) {
ts.using();
DuccId duccId = jdStatusReport.getDuccId();
String sid = ""+duccId.getFriendly();
DuccWorkJob duccWorkJob = (DuccWorkJob) WorkMapHelper.findDuccWork(workMap, sid, this, methodName);
if(duccWorkJob != null) {
//
String jdJmxUrl = jdStatusReport.getJdJmxUrl();
setJdJmxUrl(duccWorkJob, jdJmxUrl);
//
copyInvestmentReport(duccWorkJob, jdStatusReport);
copyProcessWorkItemsReport(duccWorkJob, jdStatusReport);
copyDriverWorkItemsReport(duccWorkJob, jdStatusReport);
//
logger.debug(methodName, duccId, duccWorkJob.getJobState(), "total="+jdStatusReport.getWorkItemsTotal(), "completed="+jdStatusReport.getWorkItemsProcessingCompleted(), "error="+jdStatusReport.getWorkItemsProcessingError());
switch(duccWorkJob.getJobState()) {
case Completed:
break;
case Completing:
default:
duccWorkJob.setWiTotal(jdStatusReport.getWorkItemsTotal());
duccWorkJob.setWiDone(jdStatusReport.getWorkItemsProcessingCompleted());
duccWorkJob.setWiError(jdStatusReport.getWorkItemsProcessingError());
break;
}
//
IRationale rationale;
if(jdStatusReport.getWorkItemsTotal() == 0) {
jobTerminate(duccWorkJob, JobCompletionType.NoWorkItemsFound, new Rationale("job driver had no work items to process"), ProcessDeallocationType.JobCanceled);
}
else if(jdStatusReport.isKillJob()) {
rationale = jdStatusReport.getJobCompletionRationale();
jobTerminate(duccWorkJob, JobCompletionType.CanceledByDriver, rationale, ProcessDeallocationType.JobFailure);
}
else {
switch(jdStatusReport.getDriverState()) {
case Failed:
rationale = jdStatusReport.getJobCompletionRationale();
jobTerminate(duccWorkJob, JobCompletionType.CanceledByDriver, rationale, ProcessDeallocationType.JobFailure);
break;
case NotRunning:
break;
case Initializing:
switch(duccWorkJob.getJobState()) {
case WaitingForDriver:
addJdUrlToJpCommandLine(duccWorkJob, jdStatusReport);
addDeployableToJpCommandLine(duccWorkJob, jdStatusReport);
if(!duccWorkJob.isJdURLSpecified()) {
logger.debug(methodName, duccId, "No JdURL provided yet - still waitingForDriver");
break;
}
JobState nextState = JobState.WaitingForServices;
if(duccWorkJob.getServiceDependencies() == null) {
String message = messages.fetch("bypass")+" "+nextState;
logger.debug(methodName, duccId, message);
nextState = JobState.WaitingForResources;
}
stateJobAccounting.stateChange(duccWorkJob, nextState);
break;
case Assigned:
stateJobAccounting.stateChange(duccWorkJob, JobState.Initializing);
break;
case Initializing:
break;
default:
break;
}
break;
case Running:
case Idle:
if(jdStatusReport.isKillJob()) {
rationale = jdStatusReport.getJobCompletionRationale();
switch(duccWorkJob.getJobState()) {
case WaitingForServices:
if(rationale == null) {
rationale = new Rationale("waiting for services");
}
else {
if(rationale.isSpecified()) {
String text = rationale.getText();
rationale = new Rationale(text+": "+"waiting for services");
}
else {
rationale = new Rationale("waiting for services");
}
}
break;
default:
break;
}
jobTerminate(duccWorkJob, JobCompletionType.CanceledByDriver, rationale, ProcessDeallocationType.JobFailure);
break;
}
switch(duccWorkJob.getJobState()) {
case WaitingForDriver:
stateJobAccounting.stateChange(duccWorkJob, JobState.WaitingForServices);
break;
case Assigned:
case Initializing:
stateJobAccounting.stateChange(duccWorkJob, JobState.Running);
break;
default:
break;
}
break;
case Completing:
if(!duccWorkJob.isFinished()) {
stateJobAccounting.stateChange(duccWorkJob, JobState.Completing);
}
stopJps(duccWorkJob);
stopJd(duccWorkJob);
break;
case Completed:
if(!duccWorkJob.isCompleted()) {
if(!duccWorkJob.isFinished()) {
stateJobAccounting.stateChange(duccWorkJob, JobState.Completing);
}
stopJps(duccWorkJob);
stopJd(duccWorkJob);
duccWorkJob.getStandardInfo().setDateOfCompletion(TimeStamp.getCurrentMillis());
switch(jdStatusReport.getJobCompletionType()) {
case EndOfJob:
try {
int errors = jdStatusReport.getWorkItemsProcessingError();
int done = jdStatusReport.getWorkItemsProcessingCompleted();
if(errors > 0) {
setCompletionIfNotAlreadySet(duccWorkJob, JobCompletionType.Error, new Rationale("state manager detected error work items="+errors));
}
else if(done == 0) {
setCompletionIfNotAlreadySet(duccWorkJob, JobCompletionType.EndOfJob, new Rationale("state manager detected no work items processed"));
}
else {
setCompletionIfNotAlreadySet(duccWorkJob, JobCompletionType.EndOfJob, new Rationale("state manager detected normal completion"));
}
}
catch(Exception e) {
logger.error(methodName, duccId, e);
}
break;
default:
setCompletionIfNotAlreadySet(duccWorkJob, jdStatusReport);
break;
}
}
break;
case Undefined:
break;
}
}
//
OrchestratorCommonArea.getInstance().getProcessAccounting().setStatus(jdStatusReport,duccWorkJob);
if(deallocateIdleProcesses(duccWorkJob, jdStatusReport)) {
changes++;
}
if(deallocateFailedProcesses(duccWorkJob, jdStatusReport)) {
changes++;
}
}
else {
logger.warn(methodName, duccId, messages.fetch("not found"));
}
}
ts.ended();
if(changes > 0) {
OrchestratorCheckpoint.getInstance().saveState();
}
logger.trace(methodName, null, messages.fetch("exit"));
}