in tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java [1994:2174]
public void serviceStart() throws Exception {
//start all the components
startServices();
super.serviceStart();
boolean invalidSession = false;
if (isSession && !recoveryEnabled && appAttemptID.getAttemptId() > 1) {
String err = INVALID_SESSION_ERR_MSG;
LOG.error(err);
addDiagnostic(err);
this.state = DAGAppMasterState.ERROR;
invalidSession = true;
}
if (versionMismatch || invalidSession) {
// Short-circuit and return as no DAG should be run
this.taskSchedulerManager.setShouldUnregisterFlag();
shutdownHandler.shutdown();
return;
}
this.appsStartTime = clock.getTime();
AMStartedEvent startEvent = new AMStartedEvent(appAttemptID,
appsStartTime, appMasterUgi.getShortUserName());
historyEventHandler.handle(
new DAGHistoryEvent(startEvent));
this.lastDAGCompletionTime = clock.getTime();
DAGRecoveryData recoveredDAGData;
try {
recoveredDAGData = recoverDAG();
} catch (IOException e) {
LOG.error("Error occurred when trying to recover data from previous attempt."
+ " Shutting down AM", e);
this.state = DAGAppMasterState.ERROR;
this.taskSchedulerManager.setShouldUnregisterFlag();
shutdownHandler.shutdown();
return;
}
DAGPlan dagPlan = null;
if (!isSession) {
LOG.info("In Non-Session mode.");
dagPlan = readDAGPlanFile();
if (hasConcurrentEdge(dagPlan)) {
// Currently a DAG with concurrent edge is deemed unrecoverable
// (run from scratch) on AM failover. Proper AM failover for DAG with
// concurrent edge is pending TEZ-4017
if (recoveredDAGData != null) {
LOG.warn("Ignoring recoveredDAGData for a recovered DAG with concurrent edge.");
recoveredDAGData = null;
}
}
} else {
LOG.info("In Session mode. Waiting for DAG over RPC");
this.state = DAGAppMasterState.IDLE;
}
if (recoveredDAGData != null) {
if (recoveredDAGData.cumulativeAdditionalResources != null) {
recoveredDAGData.additionalUrlsForClasspath = processAdditionalResources(
recoveredDAGData.recoveredDagID,
recoveredDAGData.cumulativeAdditionalResources);
amResources.putAll(recoveredDAGData.cumulativeAdditionalResources);
cumulativeAdditionalResources.putAll(recoveredDAGData.cumulativeAdditionalResources);
}
if (recoveredDAGData.isSessionStopped) {
LOG.info("AM crashed when shutting down in the previous attempt"
+ ", continue the shutdown and recover it to SUCCEEDED");
this.sessionStopped.set(true);
return;
}
if (recoveredDAGData.isCompleted
|| recoveredDAGData.nonRecoverable) {
LOG.info("Found previous DAG in completed or non-recoverable state"
+ ", dagId=" + recoveredDAGData.recoveredDagID
+ ", isCompleted=" + recoveredDAGData.isCompleted
+ ", isNonRecoverable=" + recoveredDAGData.nonRecoverable
+ ", state=" + (recoveredDAGData.dagState == null ? "null" :
recoveredDAGData.dagState)
+ ", failureReason=" + recoveredDAGData.reason);
updateLoggers(recoveredDAGData.recoveredDAG, "");
if (recoveredDAGData.nonRecoverable) {
addDiagnostic("DAG " + recoveredDAGData.recoveredDagID + " can not be recovered due to "
+ recoveredDAGData.reason);
DAGEventRecoverEvent recoverDAGEvent =
new DAGEventRecoverEvent(recoveredDAGData.recoveredDAG.getID(),
DAGState.FAILED, recoveredDAGData);
DAGRecoveredEvent dagRecoveredEvent = new DAGRecoveredEvent(this.appAttemptID,
recoveredDAGData.recoveredDAG.getID(), recoveredDAGData.recoveredDAG.getName(),
recoveredDAGData.recoveredDAG.getUserName(),
this.clock.getTime(), DAGState.FAILED, recoveredDAGData.reason,
this.containerLogs);
dagRecoveredEvent.setHistoryLoggingEnabled(
recoveredDAGData.recoveredDAG.getConf().getBoolean(
TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED,
TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED_DEFAULT));
this.historyEventHandler.handle(new DAGHistoryEvent(recoveredDAGData.recoveredDAG.getID(),
dagRecoveredEvent));
dagEventDispatcher.handle(recoverDAGEvent);
this.state = DAGAppMasterState.RUNNING;
} else {
DAGEventRecoverEvent recoverDAGEvent =
new DAGEventRecoverEvent(recoveredDAGData.recoveredDAG.getID(),
recoveredDAGData.dagState, recoveredDAGData);
DAGRecoveredEvent dagRecoveredEvent = new DAGRecoveredEvent(this.appAttemptID,
recoveredDAGData.recoveredDAG.getID(), recoveredDAGData.recoveredDAG.getName(),
recoveredDAGData.recoveredDAG.getUserName(), this.clock.getTime(),
recoveredDAGData.dagState, null, this.containerLogs);
this.historyEventHandler.handle(new DAGHistoryEvent(recoveredDAGData.recoveredDAG.getID(),
dagRecoveredEvent));
dagEventDispatcher.handle(recoverDAGEvent);
this.state = DAGAppMasterState.RUNNING;
}
} else {
LOG.info("Found DAG to recover, dagId=" + recoveredDAGData.recoveredDAG.getID());
updateLoggers(recoveredDAGData.recoveredDAG, "");
DAGRecoveredEvent dagRecoveredEvent = new DAGRecoveredEvent(this.appAttemptID,
recoveredDAGData.recoveredDAG.getID(), recoveredDAGData.recoveredDAG.getName(),
recoveredDAGData.recoveredDAG.getUserName(), this.clock.getTime(), this.containerLogs);
this.historyEventHandler.handle(new DAGHistoryEvent(recoveredDAGData.recoveredDAG.getID(),
dagRecoveredEvent));
DAGEventRecoverEvent recoverDAGEvent = new DAGEventRecoverEvent(
recoveredDAGData.recoveredDAG.getID(), recoveredDAGData);
dagEventDispatcher.handle(recoverDAGEvent);
// If we reach here, then we have recoverable DAG and we need to
// reinitialize the vertex services including speculators.
currentDAG.onStart();
this.state = DAGAppMasterState.RUNNING;
}
} else {
if (!isSession) {
// No dag recovered - in non-session, just restart the original DAG
dagCounter.set(0);
assert(dagPlan != null);
startDAG(dagPlan, null);
}
}
if (isSession && sessionTimeoutInterval >= 0) {
this.dagSubmissionTimer = new Timer("DAGSubmissionTimer", true);
this.dagSubmissionTimer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
checkAndHandleSessionTimeout();
} catch (TezException e) {
LOG.error("Error when checking AM session timeout", e);
}
}
}, sessionTimeoutInterval, sessionTimeoutInterval / 10);
}
// Ignore client heartbeat timeout in local mode or non-session mode
if (!isLocal && isSession && clientAMHeartbeatTimeoutIntervalMillis > 0) {
// reset heartbeat time
clientHandler.updateLastHeartbeatTime();
this.clientAMHeartBeatTimeoutService = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder()
.setDaemon(true).setNameFormat("ClientAMHeartBeatKeepAliveCheck #%d").build()
);
this.clientAMHeartBeatTimeoutService.schedule(new Runnable() {
@Override
public void run() {
try {
long nextExpiry = checkAndHandleDAGClientTimeout();
if (nextExpiry > 0) {
clientAMHeartBeatTimeoutService.schedule(this, nextExpiry, TimeUnit.MILLISECONDS);
}
} catch (TezException e) {
// Cannot be thrown unless the AM is being tried to shutdown so no need to
// reschedule the timer task
LOG.error("Error when checking Client AM heartbeat timeout", e);
}
}
}, clientAMHeartbeatTimeoutIntervalMillis, TimeUnit.MILLISECONDS);
}
}