in tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java [443:540]
private synchronized void handle(DAGAppMasterEvent event) {
switch (event.getType()) {
case INTERNAL_ERROR:
state = DAGAppMasterState.ERROR;
if(currentDAG != null) {
_updateLoggers(currentDAG, "_post");
// notify dag to finish which will send the DAG_FINISHED event
LOG.info("Internal Error. Notifying dags to finish.");
sendEvent(new DAGEvent(currentDAG.getID(), DAGEventType.INTERNAL_ERROR));
} else {
LOG.info("Internal Error. Finishing directly as no dag is active.");
this.taskSchedulerEventHandler.setShouldUnregisterFlag();
shutdownHandler.shutdown();
}
break;
case DAG_FINISHED:
DAGAppMasterEventDAGFinished finishEvt =
(DAGAppMasterEventDAGFinished) event;
if (!isSession) {
LOG.info("Not a session, AM will unregister as DAG has completed");
this.taskSchedulerEventHandler.setShouldUnregisterFlag();
_updateLoggers(currentDAG, "_post");
setStateOnDAGCompletion();
LOG.info("Shutting down on completion of dag:" +
finishEvt.getDAGId().toString());
shutdownHandler.shutdown();
} else {
LOG.info("DAG completed, dagId="
+ finishEvt.getDAGId().toString()
+ ", dagState=" + finishEvt.getDAGState());
lastDAGCompletionTime = clock.getTime();
_updateLoggers(currentDAG, "_post");
if (this.historyEventHandler.hasRecoveryFailed()) {
LOG.warn("Recovery had a fatal error, shutting down session after" +
" DAG completion");
sessionStopped.set(true);
}
switch(finishEvt.getDAGState()) {
case SUCCEEDED:
if (!currentDAG.getName().startsWith(
TezConfiguration.TEZ_PREWARM_DAG_NAME_PREFIX)) {
successfulDAGs.incrementAndGet();
}
break;
case FAILED:
if (!currentDAG.getName().startsWith(
TezConfiguration.TEZ_PREWARM_DAG_NAME_PREFIX)) {
failedDAGs.incrementAndGet();
}
break;
case KILLED:
if (!currentDAG.getName().startsWith(
TezConfiguration.TEZ_PREWARM_DAG_NAME_PREFIX)) {
killedDAGs.incrementAndGet();
}
break;
case ERROR:
if (!currentDAG.getName().startsWith(
TezConfiguration.TEZ_PREWARM_DAG_NAME_PREFIX)) {
failedDAGs.incrementAndGet();
}
default:
LOG.fatal("Received a DAG Finished Event with state="
+ finishEvt.getDAGState()
+ ". Error. Shutting down.");
state = DAGAppMasterState.ERROR;
this.taskSchedulerEventHandler.setShouldUnregisterFlag();
shutdownHandler.shutdown();
break;
}
if (!state.equals(DAGAppMasterState.ERROR)) {
if (!sessionStopped.get()) {
LOG.info("Waiting for next DAG to be submitted.");
this.taskSchedulerEventHandler.dagCompleted();
state = DAGAppMasterState.IDLE;
} else {
LOG.info("Session shutting down now.");
this.taskSchedulerEventHandler.setShouldUnregisterFlag();
if (this.historyEventHandler.hasRecoveryFailed()) {
state = DAGAppMasterState.FAILED;
} else {
state = DAGAppMasterState.SUCCEEDED;
}
shutdownHandler.shutdown();
}
}
}
break;
case AM_REBOOT:
LOG.info("Received an AM_REBOOT signal");
this.state = DAGAppMasterState.KILLED;
shutdownHandler.shutdown(true);
break;
default:
throw new TezUncheckedException(
"AppMaster: No handler for event type: " + event.getType());
}
}