public void launchServer()

in storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java [1418:1569]


    public void launchServer() throws Exception {
        try {
            LOG.info("Starting Nimbus with conf {}", ConfigUtils.maskPasswords(conf));
            validator.prepare(conf);

            IStormClusterState state = stormClusterState;
            NimbusInfo hpi = nimbusHostPortInfo;

            //add to nimbuses
            NimbusSummary nimbusSummary = new NimbusSummary(hpi.getHost(), hpi.getPort(), Time.currentTimeSecs(), false, STORM_VERSION);
            nimbusSummary.set_tlsPort(hpi.getTlsPort());
            state.addNimbusHost(hpi.getHost(), nimbusSummary);
            leaderElector.addToLeaderLockQueue();
            this.blobStore.startSyncBlobs();

            for (ClusterMetricsConsumerExecutor exec: clusterConsumerExceutors) {
                exec.prepare();
            }

            // Leadership coordination may be incomplete when launchServer is called. Previous behavior did a one time check
            // which could cause Nimbus to not process TopologyActions.GAIN_LEADERSHIP transitions. Similar problem exists for
            // HA Nimbus on being newly elected as leader. Change to a recurring pattern addresses these problems.
            timer.scheduleRecurring(3, 5,
                () -> {
                    try {
                        boolean isLeader = isLeader();
                        if (isLeader && !wasLeader) {
                            for (String topoId : state.activeStorms()) {
                                transition(topoId, TopologyActions.GAIN_LEADERSHIP, null);
                            }
                            clusterMetricSet.setActive(true);
                        }
                        wasLeader = isLeader;
                    } catch (Exception e) {
                        throw  new RuntimeException(e);
                    }
                });

            final boolean doNotReassign = (Boolean) conf.getOrDefault(ServerConfigUtils.NIMBUS_DO_NOT_REASSIGN, false);
            timer.scheduleRecurring(0, ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_MONITOR_FREQ_SECS)),
                () -> {
                    try {
                        if (!doNotReassign) {
                            mkAssignments();
                        }
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                });
            // Schedule topology cleanup
            cleanupTimer.scheduleRecurring(0, ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_MONITOR_FREQ_SECS)),
                () -> {
                    cleanupTimer.schedule(0, () -> doCleanup());
                });
            // Schedule Nimbus inbox cleaner
            final int jarExpSecs = ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_INBOX_JAR_EXPIRATION_SECS));
            timer.scheduleRecurring(0, ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_CLEANUP_INBOX_FREQ_SECS)),
                () -> {
                    try {
                        cleanInbox(getInbox(), jarExpSecs);
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                });


            // Schedule topology history cleaner
            Integer interval = ObjectReader.getInt(conf.get(DaemonConfig.LOGVIEWER_CLEANUP_INTERVAL_SECS), null);
            if (interval != null) {
                final int lvCleanupAgeMins = ObjectReader.getInt(conf.get(DaemonConfig.LOGVIEWER_CLEANUP_AGE_MINS));
                timer.scheduleRecurring(0, interval,
                    () -> {
                        try {
                            cleanTopologyHistory(lvCleanupAgeMins);
                        } catch (Exception e) {
                            throw new RuntimeException(e);
                        }
                    });
            }

            timer.scheduleRecurring(0, ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_CREDENTIAL_RENEW_FREQ_SECS)),
                () -> {
                    try {
                        renewCredentials();
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                });

            // Periodically make sure the blobstore update time is up to date.  This could have failed if Nimbus encountered
            // an exception updating the update time, or due to bugs causing a missed update of the blobstore mod time on a blob
            // update.
            timer.scheduleRecurring(30, ServerConfigUtils.getLocalizerUpdateBlobInterval(conf) * 5,
                () -> {
                    try {
                        blobStore.validateBlobUpdateTime();
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                });

            metricsRegistry.registerGauge("nimbus:total-available-memory-non-negative", () -> nodeIdToResources.get().values()
                    .parallelStream()
                    .mapToDouble(supervisorResources -> Math.max(supervisorResources.getAvailableMem(), 0))
                    .sum());
            metricsRegistry.registerGauge("nimbus:available-cpu-non-negative", () -> nodeIdToResources.get().values()
                    .parallelStream()
                    .mapToDouble(supervisorResources -> Math.max(supervisorResources.getAvailableCpu(), 0))
                    .sum());
            metricsRegistry.registerGauge("nimbus:total-memory", () -> nodeIdToResources.get().values()
                    .parallelStream()
                    .mapToDouble(SupervisorResources::getTotalMem)
                    .sum());
            metricsRegistry.registerGauge("nimbus:total-cpu", () -> nodeIdToResources.get().values()
                    .parallelStream()
                    .mapToDouble(SupervisorResources::getTotalCpu)
                    .sum());
            metricsRegistry.registerGauge("nimbus:longest-scheduling-time-ms", () -> {
                //We want to update longest scheduling time in real time in case scheduler get stuck
                // Get current time before startTime to avoid potential race with scheduler's Timer
                Long currTime = Time.nanoTime();
                Long startTime = schedulingStartTimeNs.get();
                return TimeUnit.NANOSECONDS.toMillis(startTime == null
                        ? longestSchedulingTime.get()
                        : Math.max(currTime - startTime, longestSchedulingTime.get()));
            });
            metricsRegistry.registerMeter("nimbus:num-launched").mark();

            timer.scheduleRecurring(0, ObjectReader.getInt(conf.get(DaemonConfig.STORM_CLUSTER_METRICS_CONSUMER_PUBLISH_INTERVAL_SECS)),
                () -> {
                    try {
                        if (isLeader()) {
                            sendClusterMetricsToExecutors();
                        }
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                });

            timer.scheduleRecurring(5, 5, clusterMetricSet);
        } catch (Exception e) {
            if (Utils.exceptionCauseIsInstanceOf(InterruptedException.class, e)) {
                throw e;
            }

            if (Utils.exceptionCauseIsInstanceOf(InterruptedIOException.class, e)) {
                throw e;
            }
            LOG.error("Error on initialization of nimbus", e);
            Utils.exitProcess(13, "Error on initialization of nimbus");
        }
    }