in src/main/java/com/aliyun/emr/example/storm/benchmark/AbstractTopology.java [46:77]
public void run(boolean cluster) throws Exception {
String name = configure.getProperty("name");
Config conf = new Config();
if (!cluster) {
new LocalCluster().submitTopology("local-" + name, conf, createTopology());
return;
}
int slots = Integer.valueOf(configure.getProperty("worker.slot.number"));
int clusterNodes = Integer.valueOf(configure.getProperty("cluster.worker.node.number"));
int workerNumber = slots * clusterNodes;
int clusterNodeMemoryMb = Integer.valueOf(configure.getProperty("cluster.memory.per.node.mb"));
int workerMem = clusterNodeMemoryMb / slots;
conf.setNumWorkers(workerNumber);
if (!Boolean.valueOf(configure.getProperty("ack.open"))) {
conf.setNumAckers(0);
}
conf.put("worker.heap.memory.mb", workerMem);
conf.put("topology.backpressure.enable", Boolean.valueOf(configure.getProperty("backpressure.enable")));
StormSubmitter.submitTopologyWithProgressBar(name, conf, createTopology());
Helper.setupShutdownHook(name); // handle Ctrl-C
System.out.println("**********metrics will begin in two minute, please start to send source data to warn up**********");
for (int i = 0; i< 2; i++) {
Thread.sleep(1000 * 60);
System.out.println("...");
}
System.out.println("********** start metrics **********");
Helper.collectMetrics(name, 60);
}