public Nimbus()

in storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java [489:631]


    public Nimbus(Map<String, Object> conf, INimbus inimbus, IStormClusterState stormClusterState, NimbusInfo hostPortInfo,
                  BlobStore blobStore, TopoCache topoCache, ILeaderElector leaderElector, IGroupMappingServiceProvider groupMapper,
                  StormMetricsRegistry metricsRegistry)
        throws Exception {
        this.conf = conf;
        this.metricsRegistry = metricsRegistry;
        this.resourceMetrics = new ResourceMetrics(metricsRegistry);
        this.submitTopologyWithOptsCalls = metricsRegistry.registerMeter("nimbus:num-submitTopologyWithOpts-calls");
        this.submitTopologyCalls = metricsRegistry.registerMeter("nimbus:num-submitTopology-calls");
        this.killTopologyWithOptsCalls = metricsRegistry.registerMeter("nimbus:num-killTopologyWithOpts-calls");
        this.killTopologyCalls = metricsRegistry.registerMeter("nimbus:num-killTopology-calls");
        this.rebalanceCalls = metricsRegistry.registerMeter("nimbus:num-rebalance-calls");
        this.activateCalls = metricsRegistry.registerMeter("nimbus:num-activate-calls");
        this.deactivateCalls = metricsRegistry.registerMeter("nimbus:num-deactivate-calls");
        this.debugCalls = metricsRegistry.registerMeter("nimbus:num-debug-calls");
        this.setWorkerProfilerCalls = metricsRegistry.registerMeter("nimbus:num-setWorkerProfiler-calls");
        this.getComponentPendingProfileActionsCalls = metricsRegistry.registerMeter(
            "nimbus:num-getComponentPendingProfileActions-calls");
        this.setLogConfigCalls = metricsRegistry.registerMeter("nimbus:num-setLogConfig-calls");
        this.uploadNewCredentialsCalls = metricsRegistry.registerMeter("nimbus:num-uploadNewCredentials-calls");
        this.beginFileUploadCalls = metricsRegistry.registerMeter("nimbus:num-beginFileUpload-calls");
        this.uploadChunkCalls = metricsRegistry.registerMeter("nimbus:num-uploadChunk-calls");
        this.finishFileUploadCalls = metricsRegistry.registerMeter("nimbus:num-finishFileUpload-calls");
        this.downloadChunkCalls = metricsRegistry.registerMeter("nimbus:num-downloadChunk-calls");
        this.getNimbusConfCalls = metricsRegistry.registerMeter("nimbus:num-getNimbusConf-calls");
        this.getLogConfigCalls = metricsRegistry.registerMeter("nimbus:num-getLogConfig-calls");
        this.getTopologyConfCalls = metricsRegistry.registerMeter("nimbus:num-getTopologyConf-calls");
        this.getTopologyCalls = metricsRegistry.registerMeter("nimbus:num-getTopology-calls");
        this.getUserTopologyCalls = metricsRegistry.registerMeter("nimbus:num-getUserTopology-calls");
        this.getClusterInfoCalls = metricsRegistry.registerMeter("nimbus:num-getClusterInfo-calls");
        this.getTopologySummariesCalls = metricsRegistry.registerMeter("nimbus:num-getTopologySummaries-calls");
        this.getTopologySummaryCalls = metricsRegistry.registerMeter("nimbus:num-getTopologySummary-calls");
        this.getTopologySummaryByNameCalls = metricsRegistry.registerMeter("nimbus:num-getTopologySummaryByName-calls");
        this.getLeaderCalls = metricsRegistry.registerMeter("nimbus:num-getLeader-calls");
        this.isTopologyNameAllowedCalls = metricsRegistry.registerMeter("nimbus:num-isTopologyNameAllowed-calls");
        this.getTopologyInfoWithOptsCalls = metricsRegistry.registerMeter(
            "nimbus:num-getTopologyInfoWithOpts-calls");
        this.getTopologyInfoCalls = metricsRegistry.registerMeter("nimbus:num-getTopologyInfo-calls");
        this.getTopologyInfoByNameCalls = metricsRegistry.registerMeter("nimbus:num-getTopologyInfoByName-calls");
        this.getTopologyInfoByNameWithOptsCalls = metricsRegistry.registerMeter("nimbus:num-getTopologyInfoByNameWithOpts-calls");
        this.getTopologyPageInfoCalls = metricsRegistry.registerMeter("nimbus:num-getTopologyPageInfo-calls");
        this.getSupervisorPageInfoCalls = metricsRegistry.registerMeter("nimbus:num-getSupervisorPageInfo-calls");
        this.getComponentPageInfoCalls = metricsRegistry.registerMeter("nimbus:num-getComponentPageInfo-calls");
        this.getOwnerResourceSummariesCalls = metricsRegistry.registerMeter(
            "nimbus:num-getOwnerResourceSummaries-calls");
        this.shutdownCalls = metricsRegistry.registerMeter("nimbus:num-shutdown-calls");
        this.processWorkerMetricsCalls = metricsRegistry.registerMeter("nimbus:process-worker-metric-calls");
        this.mkAssignmentsErrors = metricsRegistry.registerMeter("nimbus:mkAssignments-Errors");
        this.sendAssignmentExceptions = metricsRegistry.registerMeter(Constants.NIMBUS_SEND_ASSIGNMENT_EXCEPTIONS);
        this.fileUploadDuration = metricsRegistry.registerTimer("nimbus:files-upload-duration-ms");
        this.schedulingDuration = metricsRegistry.registerTimer("nimbus:topology-scheduling-duration-ms");
        this.numAddedExecPerScheduling = metricsRegistry.registerHistogram("nimbus:num-added-executors-per-scheduling");
        this.numAddedSlotPerScheduling = metricsRegistry.registerHistogram("nimbus:num-added-slots-per-scheduling");
        this.numRemovedExecPerScheduling = metricsRegistry.registerHistogram("nimbus:num-removed-executors-per-scheduling");
        this.numRemovedSlotPerScheduling = metricsRegistry.registerHistogram("nimbus:num-removed-slots-per-scheduling");
        this.numNetExecIncreasePerScheduling = metricsRegistry.registerHistogram("nimbus:num-net-executors-increase-per-scheduling");
        this.numNetSlotIncreasePerScheduling = metricsRegistry.registerHistogram("nimbus:num-net-slots-increase-per-scheduling");

        this.metricsStore = null;
        try {
            this.metricsStore = MetricStoreConfig.configure(conf, metricsRegistry);
        } catch (Exception e) {
            // the metrics store is not critical to the operation of the cluster, allow Nimbus to come up
            LOG.error("Failed to initialize metric store", e);
        }

        if (hostPortInfo == null) {
            hostPortInfo = NimbusInfo.fromConf(conf);
        }
        this.nimbusHostPortInfo = hostPortInfo;
        if (inimbus != null) {
            inimbus.prepare(conf, ServerConfigUtils.masterInimbusDir(conf));
        }

        this.inimbus = inimbus;
        this.authorizationHandler = StormCommon.mkAuthorizationHandler((String) conf.get(DaemonConfig.NIMBUS_AUTHORIZER), conf);
        this.impersonationAuthorizationHandler =
            StormCommon.mkAuthorizationHandler((String) conf.get(DaemonConfig.NIMBUS_IMPERSONATION_AUTHORIZER), conf);
        this.submittedCount = new AtomicLong(0);
        if (stormClusterState == null) {
            stormClusterState = makeStormClusterState(conf);
        }
        this.stormClusterState = stormClusterState;
        this.heartbeatsCache = new HeartbeatCache();
        this.heartbeatsReadyFlag = new AtomicBoolean(false);
        this.heartbeatsRecoveryStrategy = WorkerHeartbeatsRecoveryStrategyFactory.getStrategy(conf);
        this.downloaders = fileCacheMap(conf);
        this.uploaders = fileCacheMap(conf);
        this.blobDownloaders = makeBlobCacheMap(conf);
        this.blobUploaders = makeBlobCacheMap(conf);
        this.blobListers = makeBlobListCacheMap(conf);
        this.uptime = Utils.makeUptimeComputer();
        this.validator = ReflectionUtils
            .newInstance((String) conf.getOrDefault(DaemonConfig.NIMBUS_TOPOLOGY_VALIDATOR, DefaultTopologyValidator.class.getName()));
        this.timer = new StormTimer(null, (t, e) -> {
            LOG.error("Error while processing event", e);
            Utils.exitProcess(20, "Error while processing event");
        });
        this.cleanupTimer = new StormTimer("nimbus:cleanupTimer", (t, e) -> {
            LOG.error("Error in cleanupTimer while processing event", e);
            Utils.exitProcess(20, "Error in cleanupTimer while processing event");
        });
        this.underlyingScheduler = makeScheduler(conf, inimbus);
        this.scheduler = wrapAsBlacklistScheduler(conf, underlyingScheduler, metricsRegistry);
        this.zkClient = makeZKClient(conf);
        this.idToExecutors = new AtomicReference<>(new HashMap<>());

        if (blobStore == null) {
            blobStore = ServerUtils.getNimbusBlobStore(conf, this.nimbusHostPortInfo, null);
        }
        this.blobStore = blobStore;

        if (topoCache == null) {
            topoCache = new TopoCache(blobStore, conf);
        }
        if (leaderElector == null) {
            leaderElector = Zookeeper.zkLeaderElector(conf, zkClient, blobStore, topoCache, stormClusterState, getNimbusAcls(conf),
                metricsRegistry, submitLock);
        }
        this.leaderElector = leaderElector;
        this.blobStore.setLeaderElector(this.leaderElector);

        this.topoCache = topoCache;
        this.assignmentsDistributer = AssignmentDistributionService.getInstance(conf, this.scheduler);
        this.idToSchedStatus = new AtomicReference<>(new HashMap<>());
        this.nodeIdToResources = new AtomicReference<>(new HashMap<>());
        this.idToResources = new AtomicReference<>(new HashMap<>());
        this.idToWorkerResources = new AtomicReference<>(new HashMap<>());
        this.credRenewers = ClientAuthUtils.getCredentialRenewers(conf);
        this.topologyHistoryLock = new Object();
        this.topologyHistoryState = ServerConfigUtils.nimbusTopoHistoryState(conf);
        this.nimbusAutocredPlugins = ClientAuthUtils.getNimbusAutoCredPlugins(conf);
        this.nimbusTopologyActionNotifier = createTopologyActionNotifier(conf);
        this.clusterConsumerExceutors = makeClusterMetricsConsumerExecutors(conf);
        if (groupMapper == null) {
            groupMapper = ClientAuthUtils.getGroupMappingServiceProviderPlugin(conf);
        }
        this.groupMapper = groupMapper;
        this.principalToLocal = ClientAuthUtils.getPrincipalToLocalPlugin(conf);
        // We don't use the classpath part of this, so just an empty list
        this.supervisorClasspaths = Collections.unmodifiableNavigableMap(Utils.getConfiguredClasspathVersions(conf, EMPTY_STRING_LIST));
        clusterMetricSet = new ClusterSummaryMetricSet(metricsRegistry);
    }