private void setSpout()

in src/main/java/com/aliyun/emr/example/storm/benchmark/BasicTopology.java [29:44]


    private void setSpout(TopologyBuilder builder) {
        String consumerGroup = configure.getProperty("consumer.group");
        SpoutConfig conf = new SpoutConfig(new ZkHosts(
            configure.getProperty("zookeeper.address") + ":2181" + configure.getProperty("zookeeper.root")),
            configure.getProperty("topic"), configure.getProperty("zookeeper.root"), consumerGroup);
        conf.zkPort = 2181;
        conf.zkServers= Arrays.asList(configure.getProperty("zookeeper.address"));
        conf.socketTimeoutMs = 60 * 1000;
        conf.scheme = new KeyValueSchemeAsMultiScheme(new StringKeyValueScheme());
        conf.startOffsetTime= OffsetRequest.LatestTime();
        conf.ignoreZkOffsets = true;
        KafkaSpout spout = new KafkaSpout(conf);

        int kafkaPartition = Integer.valueOf(configure.getProperty("partition.number"));
        builder.setSpout("spout", spout, kafkaPartition);
    }