in tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java [289:424]
public synchronized void serviceInit(final Configuration conf) throws Exception {
int maxAppAttempts = 1;
String maxAppAttemptsEnv = System.getenv(
ApplicationConstants.MAX_APP_ATTEMPTS_ENV);
if (maxAppAttemptsEnv != null) {
maxAppAttempts = Integer.valueOf(maxAppAttemptsEnv);
}
isLastAMRetry = appAttemptID.getAttemptId() >= maxAppAttempts;
this.amConf = conf;
conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
dispatcher = createDispatcher();
context = new RunningAppContext(conf);
clientHandler = new DAGClientHandler(this);
addIfService(dispatcher, false);
clientRpcServer = new DAGClientServer(clientHandler, appAttemptID);
addIfService(clientRpcServer, true);
taskHeartbeatHandler = createTaskHeartbeatHandler(context, conf);
addIfService(taskHeartbeatHandler, true);
containerHeartbeatHandler = createContainerHeartbeatHandler(context, conf);
addIfService(containerHeartbeatHandler, true);
sessionToken =
TokenCache.getSessionToken(amTokens);
if (sessionToken == null) {
throw new RuntimeException("Could not find session token in AM Credentials");
}
// Prepare the TaskAttemptListener server for authentication of Containers
// TaskAttemptListener gets the information via jobTokenSecretManager.
LOG.info("Adding session token to jobTokenSecretManager for application");
jobTokenSecretManager.addTokenForJob(
appAttemptID.getApplicationId().toString(), sessionToken);
//service to handle requests to TaskUmbilicalProtocol
taskAttemptListener = createTaskAttemptListener(context,
taskHeartbeatHandler, containerHeartbeatHandler);
addIfService(taskAttemptListener, true);
containerSignatureMatcher = createContainerSignatureMatcher();
containers = new AMContainerMap(containerHeartbeatHandler,
taskAttemptListener, containerSignatureMatcher, context);
addIfService(containers, true);
dispatcher.register(AMContainerEventType.class, containers);
nodes = new AMNodeMap(dispatcher.getEventHandler(), context);
addIfService(nodes, true);
dispatcher.register(AMNodeEventType.class, nodes);
this.dagEventDispatcher = new DagEventDispatcher();
this.vertexEventDispatcher = new VertexEventDispatcher();
//register the event dispatchers
dispatcher.register(DAGAppMasterEventType.class, new DAGAppMasterEventHandler());
dispatcher.register(DAGEventType.class, dagEventDispatcher);
dispatcher.register(VertexEventType.class, vertexEventDispatcher);
dispatcher.register(TaskEventType.class, new TaskEventDispatcher());
dispatcher.register(TaskAttemptEventType.class,
new TaskAttemptEventDispatcher());
this.taskSchedulerEventHandler = new TaskSchedulerEventHandler(context,
clientRpcServer, dispatcher.getEventHandler(), containerSignatureMatcher);
addIfService(taskSchedulerEventHandler, true);
if (isLastAMRetry) {
LOG.info("AM will unregister as this is the last attempt"
+ ", currentAttempt=" + appAttemptID.getAttemptId()
+ ", maxAttempts=" + maxAppAttempts);
this.taskSchedulerEventHandler.setShouldUnregisterFlag();
}
dispatcher.register(AMSchedulerEventType.class,
taskSchedulerEventHandler);
addIfServiceDependency(taskSchedulerEventHandler, clientRpcServer);
containerLauncher = createContainerLauncher(context);
addIfService(containerLauncher, true);
dispatcher.register(NMCommunicatorEventType.class, containerLauncher);
historyEventHandler = new HistoryEventHandler(context);
addIfService(historyEventHandler, true);
this.sessionTimeoutInterval = 1000 * amConf.getInt(
TezConfiguration.TEZ_SESSION_AM_DAG_SUBMIT_TIMEOUT_SECS,
TezConfiguration.TEZ_SESSION_AM_DAG_SUBMIT_TIMEOUT_SECS_DEFAULT);
String strAppId = this.appAttemptID.getApplicationId().toString();
this.tezSystemStagingDir = TezCommonUtils.getTezSystemStagingPath(conf, strAppId);
recoveryDataDir = TezCommonUtils.getRecoveryPath(tezSystemStagingDir, conf);
recoveryFS = recoveryDataDir.getFileSystem(conf);
currentRecoveryDataDir = TezCommonUtils.getAttemptRecoveryPath(recoveryDataDir,
appAttemptID.getAttemptId());
if (LOG.isDebugEnabled()) {
LOG.info("Stage directory information for AppAttemptId :" + this.appAttemptID
+ " tezSystemStagingDir :" + tezSystemStagingDir + " recoveryDataDir :" + recoveryDataDir
+ " recoveryAttemptDir :" + currentRecoveryDataDir);
}
if (isSession) {
FileInputStream sessionResourcesStream = null;
try {
sessionResourcesStream = new FileInputStream(
TezConfiguration.TEZ_SESSION_LOCAL_RESOURCES_PB_FILE_NAME);
PlanLocalResourcesProto sessionLocalResourcesProto =
PlanLocalResourcesProto.parseDelimitedFrom(sessionResourcesStream);
PlanLocalResourcesProto amLocalResourceProto = PlanLocalResourcesProto
.parseDelimitedFrom(sessionResourcesStream);
sessionResources.putAll(DagTypeConverters.convertFromPlanLocalResources(
sessionLocalResourcesProto));
amResources.putAll(DagTypeConverters.convertFromPlanLocalResources(amLocalResourceProto));
amResources.putAll(sessionResources);
} finally {
if (sessionResourcesStream != null) {
sessionResourcesStream.close();
}
}
}
recoveryEnabled = conf.getBoolean(TezConfiguration.DAG_RECOVERY_ENABLED,
TezConfiguration.DAG_RECOVERY_ENABLED_DEFAULT);
initServices(conf);
super.serviceInit(conf);
AMLaunchedEvent launchedEvent = new AMLaunchedEvent(appAttemptID,
startTime, appSubmitTime, appMasterUgi.getShortUserName());
historyEventHandler.handle(
new DAGHistoryEvent(launchedEvent));
this.state = DAGAppMasterState.INITED;
}