in storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java [1418:1569]
public void launchServer() throws Exception {
try {
LOG.info("Starting Nimbus with conf {}", ConfigUtils.maskPasswords(conf));
validator.prepare(conf);
IStormClusterState state = stormClusterState;
NimbusInfo hpi = nimbusHostPortInfo;
//add to nimbuses
NimbusSummary nimbusSummary = new NimbusSummary(hpi.getHost(), hpi.getPort(), Time.currentTimeSecs(), false, STORM_VERSION);
nimbusSummary.set_tlsPort(hpi.getTlsPort());
state.addNimbusHost(hpi.getHost(), nimbusSummary);
leaderElector.addToLeaderLockQueue();
this.blobStore.startSyncBlobs();
for (ClusterMetricsConsumerExecutor exec: clusterConsumerExceutors) {
exec.prepare();
}
// Leadership coordination may be incomplete when launchServer is called. Previous behavior did a one time check
// which could cause Nimbus to not process TopologyActions.GAIN_LEADERSHIP transitions. Similar problem exists for
// HA Nimbus on being newly elected as leader. Change to a recurring pattern addresses these problems.
timer.scheduleRecurring(3, 5,
() -> {
try {
boolean isLeader = isLeader();
if (isLeader && !wasLeader) {
for (String topoId : state.activeStorms()) {
transition(topoId, TopologyActions.GAIN_LEADERSHIP, null);
}
clusterMetricSet.setActive(true);
}
wasLeader = isLeader;
} catch (Exception e) {
throw new RuntimeException(e);
}
});
final boolean doNotReassign = (Boolean) conf.getOrDefault(ServerConfigUtils.NIMBUS_DO_NOT_REASSIGN, false);
timer.scheduleRecurring(0, ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_MONITOR_FREQ_SECS)),
() -> {
try {
if (!doNotReassign) {
mkAssignments();
}
} catch (Exception e) {
throw new RuntimeException(e);
}
});
// Schedule topology cleanup
cleanupTimer.scheduleRecurring(0, ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_MONITOR_FREQ_SECS)),
() -> {
cleanupTimer.schedule(0, () -> doCleanup());
});
// Schedule Nimbus inbox cleaner
final int jarExpSecs = ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_INBOX_JAR_EXPIRATION_SECS));
timer.scheduleRecurring(0, ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_CLEANUP_INBOX_FREQ_SECS)),
() -> {
try {
cleanInbox(getInbox(), jarExpSecs);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
// Schedule topology history cleaner
Integer interval = ObjectReader.getInt(conf.get(DaemonConfig.LOGVIEWER_CLEANUP_INTERVAL_SECS), null);
if (interval != null) {
final int lvCleanupAgeMins = ObjectReader.getInt(conf.get(DaemonConfig.LOGVIEWER_CLEANUP_AGE_MINS));
timer.scheduleRecurring(0, interval,
() -> {
try {
cleanTopologyHistory(lvCleanupAgeMins);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
timer.scheduleRecurring(0, ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_CREDENTIAL_RENEW_FREQ_SECS)),
() -> {
try {
renewCredentials();
} catch (Exception e) {
throw new RuntimeException(e);
}
});
// Periodically make sure the blobstore update time is up to date. This could have failed if Nimbus encountered
// an exception updating the update time, or due to bugs causing a missed update of the blobstore mod time on a blob
// update.
timer.scheduleRecurring(30, ServerConfigUtils.getLocalizerUpdateBlobInterval(conf) * 5,
() -> {
try {
blobStore.validateBlobUpdateTime();
} catch (IOException e) {
throw new RuntimeException(e);
}
});
metricsRegistry.registerGauge("nimbus:total-available-memory-non-negative", () -> nodeIdToResources.get().values()
.parallelStream()
.mapToDouble(supervisorResources -> Math.max(supervisorResources.getAvailableMem(), 0))
.sum());
metricsRegistry.registerGauge("nimbus:available-cpu-non-negative", () -> nodeIdToResources.get().values()
.parallelStream()
.mapToDouble(supervisorResources -> Math.max(supervisorResources.getAvailableCpu(), 0))
.sum());
metricsRegistry.registerGauge("nimbus:total-memory", () -> nodeIdToResources.get().values()
.parallelStream()
.mapToDouble(SupervisorResources::getTotalMem)
.sum());
metricsRegistry.registerGauge("nimbus:total-cpu", () -> nodeIdToResources.get().values()
.parallelStream()
.mapToDouble(SupervisorResources::getTotalCpu)
.sum());
metricsRegistry.registerGauge("nimbus:longest-scheduling-time-ms", () -> {
//We want to update longest scheduling time in real time in case scheduler get stuck
// Get current time before startTime to avoid potential race with scheduler's Timer
Long currTime = Time.nanoTime();
Long startTime = schedulingStartTimeNs.get();
return TimeUnit.NANOSECONDS.toMillis(startTime == null
? longestSchedulingTime.get()
: Math.max(currTime - startTime, longestSchedulingTime.get()));
});
metricsRegistry.registerMeter("nimbus:num-launched").mark();
timer.scheduleRecurring(0, ObjectReader.getInt(conf.get(DaemonConfig.STORM_CLUSTER_METRICS_CONSUMER_PUBLISH_INTERVAL_SECS)),
() -> {
try {
if (isLeader()) {
sendClusterMetricsToExecutors();
}
} catch (Exception e) {
throw new RuntimeException(e);
}
});
timer.scheduleRecurring(5, 5, clusterMetricSet);
} catch (Exception e) {
if (Utils.exceptionCauseIsInstanceOf(InterruptedException.class, e)) {
throw e;
}
if (Utils.exceptionCauseIsInstanceOf(InterruptedIOException.class, e)) {
throw e;
}
LOG.error("Error on initialization of nimbus", e);
Utils.exitProcess(13, "Error on initialization of nimbus");
}
}