in storm-server/src/main/java/org/apache/storm/LocalCluster.java [177:294]
private LocalCluster(Builder builder) throws Exception {
if (builder.simulateTime) {
time = new SimulatedTime();
} else {
time = null;
}
boolean success = false;
try {
this.trackId = builder.trackId;
if (trackId != null) {
ConcurrentHashMap<String, AtomicInteger> metrics = new ConcurrentHashMap<>();
metrics.put("spout-emitted", new AtomicInteger(0));
metrics.put("transferred", new AtomicInteger(0));
metrics.put("processed", new AtomicInteger(0));
this.commonInstaller = new StormCommonInstaller(new TrackedStormCommon(this.trackId));
LOG.warn("Adding tracked metrics for ID {}", this.trackId);
RegisteredGlobalState.setState(this.trackId, metrics);
LocalExecutor.setTrackId(this.trackId);
} else {
this.commonInstaller = null;
}
this.tmpDirs = new ArrayList<>();
this.supervisors = new ArrayList<>();
TmpPath nimbusTmp = new TmpPath();
this.tmpDirs.add(nimbusTmp);
stormHomeBackup = System.getProperty(ConfigUtils.STORM_HOME);
TmpPath stormHome = new TmpPath();
if (!stormHome.getFile().mkdirs()) {
throw new IllegalStateException("Failed to create storm.home directory " + stormHome.getPath());
}
this.tmpDirs.add(stormHome);
System.setProperty(ConfigUtils.STORM_HOME, stormHome.getPath());
Map<String, Object> conf = ConfigUtils.readStormConfig();
conf.put(Config.TOPOLOGY_SKIP_MISSING_KRYO_REGISTRATIONS, true);
conf.put(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS, false);
conf.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS, 50);
conf.put(Config.STORM_CLUSTER_MODE, "local");
conf.put(Config.BLOBSTORE_DIR, nimbusTmp.getPath());
conf.put(Config.TOPOLOGY_MIN_REPLICATION_COUNT, 1);
InProcessZookeeper zookeeper = null;
if (!builder.daemonConf.containsKey(Config.STORM_ZOOKEEPER_SERVERS)) {
zookeeper = new InProcessZookeeper();
conf.put(Config.STORM_ZOOKEEPER_PORT, zookeeper.getPort());
conf.put(Config.STORM_ZOOKEEPER_SERVERS, Arrays.asList("localhost"));
}
this.zookeeper = zookeeper;
conf.putAll(builder.daemonConf);
this.daemonConf = new HashMap<>(conf);
this.metricRegistry = new StormMetricsRegistry();
this.portCounter = builder.supervisorSlotPortMin;
ClusterStateContext cs = new ClusterStateContext(DaemonType.NIMBUS, daemonConf);
this.state = ClusterUtils.mkStateStorage(this.daemonConf, null, cs);
if (builder.clusterState == null) {
clusterState = ClusterUtils.mkStormClusterState(this.daemonConf, cs);
} else {
this.clusterState = builder.clusterState;
}
if (!Time.isSimulating()) {
//Ensure Nimbus assigns topologies as quickly as possible
conf.put(DaemonConfig.NIMBUS_MONITOR_FREQ_SECS, 1);
}
//Set it for nimbus only
conf.put(Config.STORM_LOCAL_DIR, nimbusTmp.getPath());
Nimbus nimbus = new Nimbus(conf, builder.inimbus == null ? new StandaloneINimbus() : builder.inimbus,
this.getClusterState(), null, builder.store, builder.topoCache, builder.leaderElector,
builder.groupMapper, metricRegistry);
if (builder.nimbusWrapper != null) {
nimbus = builder.nimbusWrapper.apply(nimbus);
}
this.nimbus = nimbus;
this.nimbus.launchServer();
if (!this.nimbus.awaitLeadership(Testing.TEST_TIMEOUT_MS, TimeUnit.MILLISECONDS)) {
//Ensure Nimbus has leadership, otherwise topology submission will fail.
throw new RuntimeException("LocalCluster Nimbus failed to gain leadership.");
}
IContext context = null;
if (!ObjectReader.getBoolean(this.daemonConf.get(Config.STORM_LOCAL_MODE_ZMQ), false)) {
context = new Context();
context.prepare(this.daemonConf);
}
this.sharedContext = context;
this.thriftServer = builder.nimbusDaemon ? startNimbusDaemon(this.daemonConf, this.nimbus) : null;
for (int i = 0; i < builder.supervisors; i++) {
addSupervisor(builder.portsPerSupervisor, null, null);
}
//Wait for a leader to be elected (or topology submission can be rejected)
try {
long timeoutAfter = System.currentTimeMillis() + 10_000;
while (!hasLeader()) {
if (timeoutAfter > System.currentTimeMillis()) {
throw new IllegalStateException("Timed out waiting for nimbus to become the leader");
}
Thread.sleep(1);
}
} catch (Exception e) {
//Ignore any exceptions we might be doing a test for authentication
}
if (thriftServer == null) {
//We don't want to override the client if there is a thrift server up and running, or we would not test any
// Of the actual thrift code
this.nimbusOverride = new NimbusClient.LocalOverride(this);
} else {
this.nimbusOverride = null;
}
success = true;
metricRegistry.startMetricsReporters(daemonConf);
} finally {
if (!success) {
close();
}
}
}