in src/main/java/com/aliyun/emr/example/storm/benchmark/BasicTopology.java [29:44]
private void setSpout(TopologyBuilder builder) {
String consumerGroup = configure.getProperty("consumer.group");
SpoutConfig conf = new SpoutConfig(new ZkHosts(
configure.getProperty("zookeeper.address") + ":2181" + configure.getProperty("zookeeper.root")),
configure.getProperty("topic"), configure.getProperty("zookeeper.root"), consumerGroup);
conf.zkPort = 2181;
conf.zkServers= Arrays.asList(configure.getProperty("zookeeper.address"));
conf.socketTimeoutMs = 60 * 1000;
conf.scheme = new KeyValueSchemeAsMultiScheme(new StringKeyValueScheme());
conf.startOffsetTime= OffsetRequest.LatestTime();
conf.ignoreZkOffsets = true;
KafkaSpout spout = new KafkaSpout(conf);
int kafkaPartition = Integer.valueOf(configure.getProperty("partition.number"));
builder.setSpout("spout", spout, kafkaPartition);
}