private LocalCluster()

in storm-server/src/main/java/org/apache/storm/LocalCluster.java [177:294]


    private LocalCluster(Builder builder) throws Exception {
        if (builder.simulateTime) {
            time = new SimulatedTime();
        } else {
            time = null;
        }
        boolean success = false;
        try {
            this.trackId = builder.trackId;
            if (trackId != null) {
                ConcurrentHashMap<String, AtomicInteger> metrics = new ConcurrentHashMap<>();
                metrics.put("spout-emitted", new AtomicInteger(0));
                metrics.put("transferred", new AtomicInteger(0));
                metrics.put("processed", new AtomicInteger(0));
                this.commonInstaller = new StormCommonInstaller(new TrackedStormCommon(this.trackId));
                LOG.warn("Adding tracked metrics for ID {}", this.trackId);
                RegisteredGlobalState.setState(this.trackId, metrics);
                LocalExecutor.setTrackId(this.trackId);
            } else {
                this.commonInstaller = null;
            }

            this.tmpDirs = new ArrayList<>();
            this.supervisors = new ArrayList<>();
            TmpPath nimbusTmp = new TmpPath();
            this.tmpDirs.add(nimbusTmp);
            stormHomeBackup = System.getProperty(ConfigUtils.STORM_HOME);
            TmpPath stormHome = new TmpPath();
            if (!stormHome.getFile().mkdirs()) {
                throw new IllegalStateException("Failed to create storm.home directory " + stormHome.getPath());
            }
            this.tmpDirs.add(stormHome);
            System.setProperty(ConfigUtils.STORM_HOME, stormHome.getPath());
            Map<String, Object> conf = ConfigUtils.readStormConfig();
            conf.put(Config.TOPOLOGY_SKIP_MISSING_KRYO_REGISTRATIONS, true);
            conf.put(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS, false);
            conf.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS, 50);
            conf.put(Config.STORM_CLUSTER_MODE, "local");
            conf.put(Config.BLOBSTORE_DIR, nimbusTmp.getPath());
            conf.put(Config.TOPOLOGY_MIN_REPLICATION_COUNT, 1);

            InProcessZookeeper zookeeper = null;
            if (!builder.daemonConf.containsKey(Config.STORM_ZOOKEEPER_SERVERS)) {
                zookeeper = new InProcessZookeeper();
                conf.put(Config.STORM_ZOOKEEPER_PORT, zookeeper.getPort());
                conf.put(Config.STORM_ZOOKEEPER_SERVERS, Arrays.asList("localhost"));
            }
            this.zookeeper = zookeeper;
            conf.putAll(builder.daemonConf);
            this.daemonConf = new HashMap<>(conf);
            this.metricRegistry = new StormMetricsRegistry();

            this.portCounter = builder.supervisorSlotPortMin;
            ClusterStateContext cs = new ClusterStateContext(DaemonType.NIMBUS, daemonConf);
            this.state = ClusterUtils.mkStateStorage(this.daemonConf, null, cs);
            if (builder.clusterState == null) {
                clusterState = ClusterUtils.mkStormClusterState(this.daemonConf, cs);
            } else {
                this.clusterState = builder.clusterState;
            }
            if (!Time.isSimulating()) {
                //Ensure Nimbus assigns topologies as quickly as possible
                conf.put(DaemonConfig.NIMBUS_MONITOR_FREQ_SECS, 1);
            }
            //Set it for nimbus only
            conf.put(Config.STORM_LOCAL_DIR, nimbusTmp.getPath());
            Nimbus nimbus = new Nimbus(conf, builder.inimbus == null ? new StandaloneINimbus() : builder.inimbus,
                                       this.getClusterState(), null, builder.store, builder.topoCache, builder.leaderElector,
                                       builder.groupMapper, metricRegistry);
            if (builder.nimbusWrapper != null) {
                nimbus = builder.nimbusWrapper.apply(nimbus);
            }
            this.nimbus = nimbus;
            this.nimbus.launchServer();
            if (!this.nimbus.awaitLeadership(Testing.TEST_TIMEOUT_MS, TimeUnit.MILLISECONDS)) {
                //Ensure Nimbus has leadership, otherwise topology submission will fail.
                throw new RuntimeException("LocalCluster Nimbus failed to gain leadership.");
            }
            IContext context = null;
            if (!ObjectReader.getBoolean(this.daemonConf.get(Config.STORM_LOCAL_MODE_ZMQ), false)) {
                context = new Context();
                context.prepare(this.daemonConf);
            }
            this.sharedContext = context;
            this.thriftServer = builder.nimbusDaemon ? startNimbusDaemon(this.daemonConf, this.nimbus) : null;

            for (int i = 0; i < builder.supervisors; i++) {
                addSupervisor(builder.portsPerSupervisor, null, null);
            }

            //Wait for a leader to be elected (or topology submission can be rejected)
            try {
                long timeoutAfter = System.currentTimeMillis() + 10_000;
                while (!hasLeader()) {
                    if (timeoutAfter > System.currentTimeMillis()) {
                        throw new IllegalStateException("Timed out waiting for nimbus to become the leader");
                    }
                    Thread.sleep(1);
                }
            } catch (Exception e) {
                //Ignore any exceptions we might be doing a test for authentication
            }
            if (thriftServer == null) {
                //We don't want to override the client if there is a thrift server up and running, or we would not test any
                // Of the actual thrift code
                this.nimbusOverride = new NimbusClient.LocalOverride(this);
            } else {
                this.nimbusOverride = null;
            }
            success = true;
            
            metricRegistry.startMetricsReporters(daemonConf);
        } finally {
            if (!success) {
                close();
            }
        }
    }