in storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java [489:631]
public Nimbus(Map<String, Object> conf, INimbus inimbus, IStormClusterState stormClusterState, NimbusInfo hostPortInfo,
BlobStore blobStore, TopoCache topoCache, ILeaderElector leaderElector, IGroupMappingServiceProvider groupMapper,
StormMetricsRegistry metricsRegistry)
throws Exception {
this.conf = conf;
this.metricsRegistry = metricsRegistry;
this.resourceMetrics = new ResourceMetrics(metricsRegistry);
this.submitTopologyWithOptsCalls = metricsRegistry.registerMeter("nimbus:num-submitTopologyWithOpts-calls");
this.submitTopologyCalls = metricsRegistry.registerMeter("nimbus:num-submitTopology-calls");
this.killTopologyWithOptsCalls = metricsRegistry.registerMeter("nimbus:num-killTopologyWithOpts-calls");
this.killTopologyCalls = metricsRegistry.registerMeter("nimbus:num-killTopology-calls");
this.rebalanceCalls = metricsRegistry.registerMeter("nimbus:num-rebalance-calls");
this.activateCalls = metricsRegistry.registerMeter("nimbus:num-activate-calls");
this.deactivateCalls = metricsRegistry.registerMeter("nimbus:num-deactivate-calls");
this.debugCalls = metricsRegistry.registerMeter("nimbus:num-debug-calls");
this.setWorkerProfilerCalls = metricsRegistry.registerMeter("nimbus:num-setWorkerProfiler-calls");
this.getComponentPendingProfileActionsCalls = metricsRegistry.registerMeter(
"nimbus:num-getComponentPendingProfileActions-calls");
this.setLogConfigCalls = metricsRegistry.registerMeter("nimbus:num-setLogConfig-calls");
this.uploadNewCredentialsCalls = metricsRegistry.registerMeter("nimbus:num-uploadNewCredentials-calls");
this.beginFileUploadCalls = metricsRegistry.registerMeter("nimbus:num-beginFileUpload-calls");
this.uploadChunkCalls = metricsRegistry.registerMeter("nimbus:num-uploadChunk-calls");
this.finishFileUploadCalls = metricsRegistry.registerMeter("nimbus:num-finishFileUpload-calls");
this.downloadChunkCalls = metricsRegistry.registerMeter("nimbus:num-downloadChunk-calls");
this.getNimbusConfCalls = metricsRegistry.registerMeter("nimbus:num-getNimbusConf-calls");
this.getLogConfigCalls = metricsRegistry.registerMeter("nimbus:num-getLogConfig-calls");
this.getTopologyConfCalls = metricsRegistry.registerMeter("nimbus:num-getTopologyConf-calls");
this.getTopologyCalls = metricsRegistry.registerMeter("nimbus:num-getTopology-calls");
this.getUserTopologyCalls = metricsRegistry.registerMeter("nimbus:num-getUserTopology-calls");
this.getClusterInfoCalls = metricsRegistry.registerMeter("nimbus:num-getClusterInfo-calls");
this.getTopologySummariesCalls = metricsRegistry.registerMeter("nimbus:num-getTopologySummaries-calls");
this.getTopologySummaryCalls = metricsRegistry.registerMeter("nimbus:num-getTopologySummary-calls");
this.getTopologySummaryByNameCalls = metricsRegistry.registerMeter("nimbus:num-getTopologySummaryByName-calls");
this.getLeaderCalls = metricsRegistry.registerMeter("nimbus:num-getLeader-calls");
this.isTopologyNameAllowedCalls = metricsRegistry.registerMeter("nimbus:num-isTopologyNameAllowed-calls");
this.getTopologyInfoWithOptsCalls = metricsRegistry.registerMeter(
"nimbus:num-getTopologyInfoWithOpts-calls");
this.getTopologyInfoCalls = metricsRegistry.registerMeter("nimbus:num-getTopologyInfo-calls");
this.getTopologyInfoByNameCalls = metricsRegistry.registerMeter("nimbus:num-getTopologyInfoByName-calls");
this.getTopologyInfoByNameWithOptsCalls = metricsRegistry.registerMeter("nimbus:num-getTopologyInfoByNameWithOpts-calls");
this.getTopologyPageInfoCalls = metricsRegistry.registerMeter("nimbus:num-getTopologyPageInfo-calls");
this.getSupervisorPageInfoCalls = metricsRegistry.registerMeter("nimbus:num-getSupervisorPageInfo-calls");
this.getComponentPageInfoCalls = metricsRegistry.registerMeter("nimbus:num-getComponentPageInfo-calls");
this.getOwnerResourceSummariesCalls = metricsRegistry.registerMeter(
"nimbus:num-getOwnerResourceSummaries-calls");
this.shutdownCalls = metricsRegistry.registerMeter("nimbus:num-shutdown-calls");
this.processWorkerMetricsCalls = metricsRegistry.registerMeter("nimbus:process-worker-metric-calls");
this.mkAssignmentsErrors = metricsRegistry.registerMeter("nimbus:mkAssignments-Errors");
this.sendAssignmentExceptions = metricsRegistry.registerMeter(Constants.NIMBUS_SEND_ASSIGNMENT_EXCEPTIONS);
this.fileUploadDuration = metricsRegistry.registerTimer("nimbus:files-upload-duration-ms");
this.schedulingDuration = metricsRegistry.registerTimer("nimbus:topology-scheduling-duration-ms");
this.numAddedExecPerScheduling = metricsRegistry.registerHistogram("nimbus:num-added-executors-per-scheduling");
this.numAddedSlotPerScheduling = metricsRegistry.registerHistogram("nimbus:num-added-slots-per-scheduling");
this.numRemovedExecPerScheduling = metricsRegistry.registerHistogram("nimbus:num-removed-executors-per-scheduling");
this.numRemovedSlotPerScheduling = metricsRegistry.registerHistogram("nimbus:num-removed-slots-per-scheduling");
this.numNetExecIncreasePerScheduling = metricsRegistry.registerHistogram("nimbus:num-net-executors-increase-per-scheduling");
this.numNetSlotIncreasePerScheduling = metricsRegistry.registerHistogram("nimbus:num-net-slots-increase-per-scheduling");
this.metricsStore = null;
try {
this.metricsStore = MetricStoreConfig.configure(conf, metricsRegistry);
} catch (Exception e) {
// the metrics store is not critical to the operation of the cluster, allow Nimbus to come up
LOG.error("Failed to initialize metric store", e);
}
if (hostPortInfo == null) {
hostPortInfo = NimbusInfo.fromConf(conf);
}
this.nimbusHostPortInfo = hostPortInfo;
if (inimbus != null) {
inimbus.prepare(conf, ServerConfigUtils.masterInimbusDir(conf));
}
this.inimbus = inimbus;
this.authorizationHandler = StormCommon.mkAuthorizationHandler((String) conf.get(DaemonConfig.NIMBUS_AUTHORIZER), conf);
this.impersonationAuthorizationHandler =
StormCommon.mkAuthorizationHandler((String) conf.get(DaemonConfig.NIMBUS_IMPERSONATION_AUTHORIZER), conf);
this.submittedCount = new AtomicLong(0);
if (stormClusterState == null) {
stormClusterState = makeStormClusterState(conf);
}
this.stormClusterState = stormClusterState;
this.heartbeatsCache = new HeartbeatCache();
this.heartbeatsReadyFlag = new AtomicBoolean(false);
this.heartbeatsRecoveryStrategy = WorkerHeartbeatsRecoveryStrategyFactory.getStrategy(conf);
this.downloaders = fileCacheMap(conf);
this.uploaders = fileCacheMap(conf);
this.blobDownloaders = makeBlobCacheMap(conf);
this.blobUploaders = makeBlobCacheMap(conf);
this.blobListers = makeBlobListCacheMap(conf);
this.uptime = Utils.makeUptimeComputer();
this.validator = ReflectionUtils
.newInstance((String) conf.getOrDefault(DaemonConfig.NIMBUS_TOPOLOGY_VALIDATOR, DefaultTopologyValidator.class.getName()));
this.timer = new StormTimer(null, (t, e) -> {
LOG.error("Error while processing event", e);
Utils.exitProcess(20, "Error while processing event");
});
this.cleanupTimer = new StormTimer("nimbus:cleanupTimer", (t, e) -> {
LOG.error("Error in cleanupTimer while processing event", e);
Utils.exitProcess(20, "Error in cleanupTimer while processing event");
});
this.underlyingScheduler = makeScheduler(conf, inimbus);
this.scheduler = wrapAsBlacklistScheduler(conf, underlyingScheduler, metricsRegistry);
this.zkClient = makeZKClient(conf);
this.idToExecutors = new AtomicReference<>(new HashMap<>());
if (blobStore == null) {
blobStore = ServerUtils.getNimbusBlobStore(conf, this.nimbusHostPortInfo, null);
}
this.blobStore = blobStore;
if (topoCache == null) {
topoCache = new TopoCache(blobStore, conf);
}
if (leaderElector == null) {
leaderElector = Zookeeper.zkLeaderElector(conf, zkClient, blobStore, topoCache, stormClusterState, getNimbusAcls(conf),
metricsRegistry, submitLock);
}
this.leaderElector = leaderElector;
this.blobStore.setLeaderElector(this.leaderElector);
this.topoCache = topoCache;
this.assignmentsDistributer = AssignmentDistributionService.getInstance(conf, this.scheduler);
this.idToSchedStatus = new AtomicReference<>(new HashMap<>());
this.nodeIdToResources = new AtomicReference<>(new HashMap<>());
this.idToResources = new AtomicReference<>(new HashMap<>());
this.idToWorkerResources = new AtomicReference<>(new HashMap<>());
this.credRenewers = ClientAuthUtils.getCredentialRenewers(conf);
this.topologyHistoryLock = new Object();
this.topologyHistoryState = ServerConfigUtils.nimbusTopoHistoryState(conf);
this.nimbusAutocredPlugins = ClientAuthUtils.getNimbusAutoCredPlugins(conf);
this.nimbusTopologyActionNotifier = createTopologyActionNotifier(conf);
this.clusterConsumerExceutors = makeClusterMetricsConsumerExecutors(conf);
if (groupMapper == null) {
groupMapper = ClientAuthUtils.getGroupMappingServiceProviderPlugin(conf);
}
this.groupMapper = groupMapper;
this.principalToLocal = ClientAuthUtils.getPrincipalToLocalPlugin(conf);
// We don't use the classpath part of this, so just an empty list
this.supervisorClasspaths = Collections.unmodifiableNavigableMap(Utils.getConfiguredClasspathVersions(conf, EMPTY_STRING_LIST));
clusterMetricSet = new ClusterSummaryMetricSet(metricsRegistry);
}