public void run()

in src/main/java/com/aliyun/emr/example/storm/benchmark/AbstractTopology.java [46:77]


    public void run(boolean cluster) throws Exception {
        String name = configure.getProperty("name");
        Config conf = new Config();

        if (!cluster) {
            new LocalCluster().submitTopology("local-" + name, conf, createTopology());
            return;
        }

        int slots = Integer.valueOf(configure.getProperty("worker.slot.number"));
        int clusterNodes = Integer.valueOf(configure.getProperty("cluster.worker.node.number"));
        int workerNumber = slots * clusterNodes;
        int clusterNodeMemoryMb = Integer.valueOf(configure.getProperty("cluster.memory.per.node.mb"));
        int workerMem = clusterNodeMemoryMb / slots;
        conf.setNumWorkers(workerNumber);
        if (!Boolean.valueOf(configure.getProperty("ack.open"))) {
            conf.setNumAckers(0);
        }

        conf.put("worker.heap.memory.mb", workerMem);
        conf.put("topology.backpressure.enable", Boolean.valueOf(configure.getProperty("backpressure.enable")));
        StormSubmitter.submitTopologyWithProgressBar(name, conf, createTopology());
        Helper.setupShutdownHook(name); // handle Ctrl-C

        System.out.println("**********metrics will begin in two minute, please start to send source data to warn up**********");
        for (int i = 0; i< 2; i++) {
            Thread.sleep(1000 * 60);
            System.out.println("...");
        }
        System.out.println("********** start metrics **********");
        Helper.collectMetrics(name, 60);
    }