in tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java [1457:1550]
public synchronized void serviceStart() throws Exception {
//start all the components
startServices();
super.serviceStart();
// metrics system init is really init & start.
// It's more test friendly to put it here.
DefaultMetricsSystem.initialize("DAGAppMaster");
this.appsStartTime = clock.getTime();
AMStartedEvent startEvent = new AMStartedEvent(appAttemptID,
appsStartTime, appMasterUgi.getShortUserName());
historyEventHandler.handle(
new DAGHistoryEvent(startEvent));
this.lastDAGCompletionTime = clock.getTime();
RecoveredDAGData recoveredDAGData;
try {
recoveredDAGData = recoverDAG();
} catch (IOException e) {
LOG.error("Error occurred when trying to recover data from previous attempt."
+ " Shutting down AM", e);
this.state = DAGAppMasterState.ERROR;
this.taskSchedulerEventHandler.setShouldUnregisterFlag();
shutdownHandler.shutdown();
return;
}
if (!isSession) {
LOG.info("In Non-Session mode.");
} else {
LOG.info("In Session mode. Waiting for DAG over RPC");
this.state = DAGAppMasterState.IDLE;
}
if (recoveredDAGData != null) {
List<URL> classpathUrls = null;
if (recoveredDAGData.cumulativeAdditionalResources != null) {
classpathUrls = processAdditionalResources(recoveredDAGData.cumulativeAdditionalResources);
amResources.putAll(recoveredDAGData.cumulativeAdditionalResources);
cumulativeAdditionalResources.putAll(recoveredDAGData.cumulativeAdditionalResources);
}
if (recoveredDAGData.isCompleted
|| recoveredDAGData.nonRecoverable) {
LOG.info("Found previous DAG in completed or non-recoverable state"
+ ", dagId=" + recoveredDAGData.recoveredDagID
+ ", isCompleted=" + recoveredDAGData.isCompleted
+ ", isNonRecoverable=" + recoveredDAGData.nonRecoverable
+ ", state=" + (recoveredDAGData.dagState == null ? "null" :
recoveredDAGData.dagState)
+ ", failureReason=" + recoveredDAGData.reason);
_updateLoggers(recoveredDAGData.recoveredDAG, "");
if (recoveredDAGData.nonRecoverable) {
DAGEventRecoverEvent recoverDAGEvent =
new DAGEventRecoverEvent(recoveredDAGData.recoveredDAG.getID(),
DAGState.FAILED, classpathUrls);
dagEventDispatcher.handle(recoverDAGEvent);
this.state = DAGAppMasterState.RUNNING;
} else {
DAGEventRecoverEvent recoverDAGEvent =
new DAGEventRecoverEvent(recoveredDAGData.recoveredDAG.getID(),
recoveredDAGData.dagState, classpathUrls);
dagEventDispatcher.handle(recoverDAGEvent);
this.state = DAGAppMasterState.RUNNING;
}
} else {
LOG.info("Found DAG to recover, dagId=" + recoveredDAGData.recoveredDAG.getID());
_updateLoggers(recoveredDAGData.recoveredDAG, "");
DAGEventRecoverEvent recoverDAGEvent = new DAGEventRecoverEvent(
recoveredDAGData.recoveredDAG.getID(), classpathUrls);
dagEventDispatcher.handle(recoverDAGEvent);
this.state = DAGAppMasterState.RUNNING;
}
} else {
if (!isSession) {
// No dag recovered - in non-session, just restart the original DAG
dagCounter.set(0);
startDAG();
}
}
if (isSession) {
this.dagSubmissionTimer = new Timer(true);
this.dagSubmissionTimer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
checkAndHandleSessionTimeout();
}
}, sessionTimeoutInterval, sessionTimeoutInterval / 10);
}
}