in hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java [763:997]
protected void serviceInit(Configuration configuration) throws Exception {
standByTransitionRunnable = new StandByTransitionRunnable();
rmSecretManagerService = createRMSecretManagerService();
addService(rmSecretManagerService);
containerAllocationExpirer = new ContainerAllocationExpirer(rmDispatcher);
addService(containerAllocationExpirer);
rmContext.setContainerAllocationExpirer(containerAllocationExpirer);
AMLivelinessMonitor amLivelinessMonitor = createAMLivelinessMonitor();
addService(amLivelinessMonitor);
rmContext.setAMLivelinessMonitor(amLivelinessMonitor);
AMLivelinessMonitor amFinishingMonitor = createAMLivelinessMonitor();
addService(amFinishingMonitor);
rmContext.setAMFinishingMonitor(amFinishingMonitor);
RMAppLifetimeMonitor rmAppLifetimeMonitor = createRMAppLifetimeMonitor();
addService(rmAppLifetimeMonitor);
rmContext.setRMAppLifetimeMonitor(rmAppLifetimeMonitor);
RMNodeLabelsManager nlm = createNodeLabelManager();
nlm.setRMContext(rmContext);
addService(nlm);
rmContext.setNodeLabelManager(nlm);
NodeAttributesManager nam = createNodeAttributesManager();
addService(nam);
rmContext.setNodeAttributesManager(nam);
AllocationTagsManager allocationTagsManager =
createAllocationTagsManager();
rmContext.setAllocationTagsManager(allocationTagsManager);
PlacementConstraintManagerService placementConstraintManager =
createPlacementConstraintManager();
addService(placementConstraintManager);
rmContext.setPlacementConstraintManager(placementConstraintManager);
// add resource profiles here because it's used by AbstractYarnScheduler
ResourceProfilesManager resourceProfilesManager =
createResourceProfileManager();
resourceProfilesManager.init(conf);
rmContext.setResourceProfilesManager(resourceProfilesManager);
MultiNodeSortingManager<SchedulerNode> multiNodeSortingManager =
createMultiNodeSortingManager();
multiNodeSortingManager.setRMContext(rmContext);
addService(multiNodeSortingManager);
rmContext.setMultiNodeSortingManager(multiNodeSortingManager);
RMDelegatedNodeLabelsUpdater delegatedNodeLabelsUpdater =
createRMDelegatedNodeLabelsUpdater();
if (delegatedNodeLabelsUpdater != null) {
addService(delegatedNodeLabelsUpdater);
rmContext.setRMDelegatedNodeLabelsUpdater(delegatedNodeLabelsUpdater);
}
recoveryEnabled = conf.getBoolean(YarnConfiguration.RECOVERY_ENABLED,
YarnConfiguration.DEFAULT_RM_RECOVERY_ENABLED);
RMStateStore rmStore;
if (recoveryEnabled) {
rmStore = RMStateStoreFactory.getStore(conf);
boolean isWorkPreservingRecoveryEnabled =
conf.getBoolean(
YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED,
YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_ENABLED);
rmContext
.setWorkPreservingRecoveryEnabled(isWorkPreservingRecoveryEnabled);
} else {
rmStore = new NullRMStateStore();
}
try {
rmStore.setResourceManager(rm);
rmStore.init(conf);
rmStore.setRMDispatcher(rmDispatcher);
} catch (Exception e) {
// the Exception from stateStore.init() needs to be handled for
// HA and we need to give up master status if we got fenced
LOG.error("Failed to init state store", e);
throw e;
}
rmContext.setStateStore(rmStore);
if (UserGroupInformation.isSecurityEnabled()) {
delegationTokenRenewer = createDelegationTokenRenewer();
rmContext.setDelegationTokenRenewer(delegationTokenRenewer);
}
// Register event handler for NodesListManager
nodesListManager = new NodesListManager(rmContext);
rmDispatcher.register(NodesListManagerEventType.class, nodesListManager);
addService(nodesListManager);
rmContext.setNodesListManager(nodesListManager);
// Initialize the scheduler
scheduler = createScheduler();
scheduler.setRMContext(rmContext);
addIfService(scheduler);
rmContext.setScheduler(scheduler);
schedulerDispatcher = createSchedulerEventDispatcher();
addIfService(schedulerDispatcher);
rmDispatcher.register(SchedulerEventType.class, schedulerDispatcher);
// Register event handler for RmAppEvents
rmDispatcher.register(RMAppEventType.class,
new ApplicationEventDispatcher(rmContext));
// Register event handler for RmAppAttemptEvents
rmDispatcher.register(RMAppAttemptEventType.class,
new ApplicationAttemptEventDispatcher(rmContext));
// Register event handler for RmNodes
rmDispatcher.register(
RMNodeEventType.class, new NodeEventDispatcher(rmContext));
nmLivelinessMonitor = createNMLivelinessMonitor();
addService(nmLivelinessMonitor);
resourceTracker = createResourceTrackerService();
addService(resourceTracker);
rmContext.setResourceTrackerService(resourceTracker);
MetricsSystem ms = DefaultMetricsSystem.initialize("ResourceManager");
if (fromActive) {
JvmMetrics.reattach(ms, jvmMetrics);
UserGroupInformation.reattachMetrics();
} else {
jvmMetrics = JvmMetrics.initSingleton("ResourceManager", null);
}
JvmPauseMonitor pauseMonitor = new JvmPauseMonitor();
addService(pauseMonitor);
jvmMetrics.setPauseMonitor(pauseMonitor);
// Initialize the Reservation system
if (conf.getBoolean(YarnConfiguration.RM_RESERVATION_SYSTEM_ENABLE,
YarnConfiguration.DEFAULT_RM_RESERVATION_SYSTEM_ENABLE)) {
reservationSystem = createReservationSystem();
if (reservationSystem != null) {
reservationSystem.setRMContext(rmContext);
addIfService(reservationSystem);
rmContext.setReservationSystem(reservationSystem);
LOG.info("Initialized Reservation system");
}
}
masterService = createApplicationMasterService();
createAndRegisterOpportunisticDispatcher(masterService);
addService(masterService) ;
rmContext.setApplicationMasterService(masterService);
applicationACLsManager = new ApplicationACLsManager(conf);
queueACLsManager = createQueueACLsManager(scheduler, conf);
rmAppManager = createRMAppManager();
// Register event handler for RMAppManagerEvents
rmDispatcher.register(RMAppManagerEventType.class, rmAppManager);
clientRM = createClientRMService();
addService(clientRM);
rmContext.setClientRMService(clientRM);
applicationMasterLauncher = createAMLauncher();
rmDispatcher.register(AMLauncherEventType.class,
applicationMasterLauncher);
addService(applicationMasterLauncher);
if (UserGroupInformation.isSecurityEnabled()) {
addService(delegationTokenRenewer);
delegationTokenRenewer.setRMContext(rmContext);
}
if(HAUtil.isFederationEnabled(conf)) {
String cId = YarnConfiguration.getClusterId(conf);
if (cId.isEmpty()) {
String errMsg =
"Cannot initialize RM as Federation is enabled"
+ " but cluster id is not configured.";
LOG.error(errMsg);
throw new YarnRuntimeException(errMsg);
}
federationStateStoreService = createFederationStateStoreService();
addIfService(federationStateStoreService);
rmAppManager.setFederationStateStoreService(federationStateStoreService);
LOG.info("Initialized Federation membership.");
}
proxyCAManager = new ProxyCAManager(new ProxyCA(), rmContext);
addService(proxyCAManager);
rmContext.setProxyCAManager(proxyCAManager);
rmnmInfo = new RMNMInfo(rmContext, scheduler);
if (conf.getBoolean(YarnConfiguration.YARN_API_SERVICES_ENABLE,
false)) {
SystemServiceManager systemServiceManager = createServiceManager();
addIfService(systemServiceManager);
}
// Add volume manager to RM context when it is necessary
String[] amsProcessorList = conf.getStrings(
YarnConfiguration.RM_APPLICATION_MASTER_SERVICE_PROCESSORS);
if (amsProcessorList != null&& Arrays.stream(amsProcessorList)
.anyMatch(s -> VolumeAMSProcessor.class.getName().equals(s))) {
VolumeManager volumeManager = new VolumeManagerImpl();
rmContext.setVolumeManager(volumeManager);
addIfService(volumeManager);
}
eventQueueMetricExecutor = new ScheduledThreadPoolExecutor(1,
new ThreadFactoryBuilder().
setDaemon(true).setNameFormat("EventQueueSizeMetricThread").
build());
eventQueueMetricExecutor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
int rmEventQueueSize = ((AsyncDispatcher)getRMContext().
getDispatcher()).getEventQueueSize();
ClusterMetrics.getMetrics().setRmEventQueueSize(rmEventQueueSize);
int schedulerEventQueueSize = ((EventDispatcher)schedulerDispatcher).
getEventQueueSize();
ClusterMetrics.getMetrics().
setSchedulerEventQueueSize(schedulerEventQueueSize);
}
}, 1, 1, TimeUnit.SECONDS);
super.serviceInit(conf);
}