in tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java [430:655]
protected void serviceInit(final Configuration conf) throws Exception {
this.amConf = conf;
initResourceCalculatorPlugins();
this.hadoopShim = new HadoopShimsLoader(this.amConf).getHadoopShim();
long sleepTimeBeforeSecs = this.amConf.getLong(
TezConfiguration.TEZ_AM_SLEEP_TIME_BEFORE_EXIT_MILLIS,
TezConstants.TEZ_DAG_SLEEP_TIME_BEFORE_EXIT);
if (sleepTimeBeforeSecs >= 0) {
this.shutdownHandler.setSleepTimeBeforeExit(sleepTimeBeforeSecs);
}
this.isLocal = conf.getBoolean(TezConfiguration.TEZ_LOCAL_MODE,
TezConfiguration.TEZ_LOCAL_MODE_DEFAULT);
UserPayload defaultPayload = TezUtils.createUserPayloadFromConf(amConf);
List<NamedEntityDescriptor> taskSchedulerDescriptors = Lists.newLinkedList();
List<NamedEntityDescriptor> containerLauncherDescriptors = Lists.newLinkedList();
List<NamedEntityDescriptor> taskCommunicatorDescriptors = Lists.newLinkedList();
parseAllPlugins(taskSchedulerDescriptors, taskSchedulers, containerLauncherDescriptors,
containerLaunchers, taskCommunicatorDescriptors, taskCommunicators, amPluginDescriptorProto,
isLocal, defaultPayload);
LOG.info(buildPluginComponentLog(taskSchedulerDescriptors, taskSchedulers, "TaskSchedulers"));
LOG.info(buildPluginComponentLog(containerLauncherDescriptors, containerLaunchers, "ContainerLaunchers"));
LOG.info(buildPluginComponentLog(taskCommunicatorDescriptors, taskCommunicators, "TaskCommunicators"));
boolean disableVersionCheck = conf.getBoolean(
TezConfiguration.TEZ_AM_DISABLE_CLIENT_VERSION_CHECK,
TezConfiguration.TEZ_AM_DISABLE_CLIENT_VERSION_CHECK_DEFAULT);
// Check client - AM version compatibility
LOG.info("Comparing client version with AM version"
+ ", clientVersion=" + clientVersion
+ ", AMVersion=" + dagVersionInfo.getVersion());
Simple2LevelVersionComparator versionComparator = new Simple2LevelVersionComparator();
if (versionComparator.compare(clientVersion, dagVersionInfo.getVersion()) != 0) {
versionMismatchDiagnostics = "Incompatible versions found"
+ ", clientVersion=" + clientVersion
+ ", AMVersion=" + dagVersionInfo.getVersion();
addDiagnostic(versionMismatchDiagnostics);
if (disableVersionCheck) {
LOG.warn("Ignoring client-AM version mismatch as check disabled. "
+ versionMismatchDiagnostics);
} else {
LOG.error(versionMismatchDiagnostics);
versionMismatch = true;
}
}
dispatcher = createDispatcher();
if (isLocal) {
conf.setBoolean(TezConfiguration.TEZ_AM_NODE_BLACKLISTING_ENABLED, false);
conf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS,
TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS_DEFAULT);
} else {
dispatcher.enableExitOnDispatchException();
}
String strAppId = this.appAttemptID.getApplicationId().toString();
this.tezSystemStagingDir = TezCommonUtils.getTezSystemStagingPath(conf, strAppId);
context = new RunningAppContext(conf);
this.aclManager = new ACLManager(appMasterUgi.getShortUserName(), this.amConf);
clientHandler = new DAGClientHandler(this);
addIfService(dispatcher, false);
recoveryDataDir = TezCommonUtils.getRecoveryPath(tezSystemStagingDir, conf);
recoveryFS = recoveryDataDir.getFileSystem(conf);
currentRecoveryDataDir = TezCommonUtils.getAttemptRecoveryPath(recoveryDataDir,
appAttemptID.getAttemptId());
if (LOG.isDebugEnabled()) {
LOG.debug("Stage directory information for AppAttemptId :" + this.appAttemptID
+ " tezSystemStagingDir :" + tezSystemStagingDir + " recoveryDataDir :" + recoveryDataDir
+ " recoveryAttemptDir :" + currentRecoveryDataDir);
}
recoveryEnabled = conf.getBoolean(TezConfiguration.DAG_RECOVERY_ENABLED,
TezConfiguration.DAG_RECOVERY_ENABLED_DEFAULT);
initClientRpcServer();
taskHeartbeatHandler = createTaskHeartbeatHandler(context, conf);
addIfService(taskHeartbeatHandler, true);
containerHeartbeatHandler = createContainerHeartbeatHandler(context, conf);
addIfService(containerHeartbeatHandler, true);
jobTokenSecretManager = new JobTokenSecretManager(amConf);
sessionToken =
TokenCache.getSessionToken(amCredentials);
if (sessionToken == null) {
throw new RuntimeException("Could not find session token in AM Credentials");
}
// Prepare the TaskAttemptListener server for authentication of Containers
// TaskAttemptListener gets the information via jobTokenSecretManager.
jobTokenSecretManager.addTokenForJob(
appAttemptID.getApplicationId().toString(), sessionToken);
//service to handle requests to TaskUmbilicalProtocol
taskCommunicatorManager = createTaskCommunicatorManager(context,
taskHeartbeatHandler, containerHeartbeatHandler, taskCommunicatorDescriptors);
addIfService(taskCommunicatorManager, true);
containerSignatureMatcher = createContainerSignatureMatcher();
containers = new AMContainerMap(containerHeartbeatHandler,
taskCommunicatorManager, containerSignatureMatcher, context);
addIfService(containers, true);
dispatcher.register(AMContainerEventType.class, containers);
nodes = new AMNodeTracker(dispatcher.getEventHandler(), context);
addIfService(nodes, true);
dispatcher.register(AMNodeEventType.class, nodes);
this.dagEventDispatcher = new DagEventDispatcher();
this.vertexEventDispatcher = new VertexEventDispatcher();
//register the event dispatchers
dispatcher.register(DAGAppMasterEventType.class, new DAGAppMasterEventHandler());
dispatcher.register(DAGEventType.class, dagEventDispatcher);
dispatcher.register(VertexEventType.class, vertexEventDispatcher);
boolean useConcurrentDispatcher =
conf.getBoolean(TezConfiguration.TEZ_AM_USE_CONCURRENT_DISPATCHER,
TezConfiguration.TEZ_AM_USE_CONCURRENT_DISPATCHER_DEFAULT);
LOG.info("Using concurrent dispatcher: " + useConcurrentDispatcher);
if (!useConcurrentDispatcher) {
dispatcher.register(TaskEventType.class, new TaskEventDispatcher());
dispatcher.register(TaskAttemptEventType.class, new TaskAttemptEventDispatcher());
} else {
int concurrency = conf.getInt(TezConfiguration.TEZ_AM_CONCURRENT_DISPATCHER_CONCURRENCY,
TezConfiguration.TEZ_AM_CONCURRENT_DISPATCHER_CONCURRENCY_DEFAULT);
AsyncDispatcherConcurrent sharedDispatcher = dispatcher.registerAndCreateDispatcher(
TaskEventType.class, new TaskEventDispatcher(), "TaskAndAttemptEventThread", concurrency);
dispatcher.registerWithExistingDispatcher(TaskAttemptEventType.class,
new TaskAttemptEventDispatcher(), sharedDispatcher);
}
// register other delegating dispatchers
dispatcher.registerAndCreateDispatcher(SpeculatorEventType.class, new SpeculatorEventHandler(),
"Speculator");
if (enableWebUIService()) {
this.webUIService = new WebUIService(context);
addIfService(webUIService, false);
} else {
LOG.debug("Web UI Service is not enabled.");
}
this.taskSchedulerManager = createTaskSchedulerManager(taskSchedulerDescriptors);
addIfService(taskSchedulerManager, true);
if (enableWebUIService()) {
addIfServiceDependency(taskSchedulerManager, webUIService);
}
dispatcher.register(AMSchedulerEventType.class,
taskSchedulerManager);
addIfServiceDependency(taskSchedulerManager, clientRpcServer);
appMasterReadinessService = createAppMasterReadinessService();
this.containerLauncherManager = createContainerLauncherManager(containerLauncherDescriptors,
isLocal);
addIfService(containerLauncherManager, true);
dispatcher.register(ContainerLauncherEventType.class, containerLauncherManager);
historyEventHandler = createHistoryEventHandler(context);
addIfService(historyEventHandler, true);
this.sessionTimeoutInterval = TezCommonUtils.getDAGSessionTimeout(amConf);
this.clientAMHeartbeatTimeoutIntervalMillis =
TezCommonUtils.getAMClientHeartBeatTimeoutMillis(amConf);
if (!versionMismatch) {
if (isSession) {
try (BufferedInputStream sessionResourcesStream =
new BufferedInputStream(
new FileInputStream(new File(workingDirectory,
TezConstants.TEZ_AM_LOCAL_RESOURCES_PB_FILE_NAME)))) {
PlanLocalResourcesProto amLocalResourceProto = PlanLocalResourcesProto
.parseDelimitedFrom(sessionResourcesStream);
amResources.putAll(DagTypeConverters
.convertFromPlanLocalResources(amLocalResourceProto));
}
}
}
int threadCount = conf.getInt(TezConfiguration.TEZ_AM_DAG_APPCONTEXT_THREAD_COUNT_LIMIT,
TezConfiguration.TEZ_AM_DAG_APPCONTEXT_THREAD_COUNT_LIMIT_DEFAULT);
// NOTE: LinkedBlockingQueue does not have a capacity Limit and can thus
// occupy large memory chunks when numerous Runables are pending for execution
ExecutorService rawExecutor =
Executors.newFixedThreadPool(threadCount, new ThreadFactoryBuilder()
.setDaemon(true).setNameFormat("App Shared Pool - #%d").build());
execService = MoreExecutors.listeningDecorator(rawExecutor);
initServices(conf);
super.serviceInit(conf);
if (!versionMismatch) {
if (this.appAttemptID.getAttemptId() == 1) {
AppLaunchedEvent appLaunchedEvent = new AppLaunchedEvent(appAttemptID.getApplicationId(),
startTime, appSubmitTime, appMasterUgi.getShortUserName(), this.amConf,
dagVersionInfo);
historyEventHandler.handle(
new DAGHistoryEvent(appLaunchedEvent));
}
AMLaunchedEvent launchedEvent = new AMLaunchedEvent(appAttemptID,
startTime, appSubmitTime, appMasterUgi.getShortUserName());
historyEventHandler.handle(
new DAGHistoryEvent(launchedEvent));
this.state = DAGAppMasterState.INITED;
} else {
this.state = DAGAppMasterState.ERROR;
}
}