in src/main/java/com/aliyun/emr/example/storm/benchmark/WordCount.java [21:37]
protected void setBolt(TopologyBuilder builder) {
int clusterCores = Integer.valueOf(configure.getProperty("cluster.cores.total"));
int availableCores = clusterCores - Integer.valueOf(configure.getProperty("partition.number"));
int hdfsParallelismFactor = Integer.parseInt(configure.getProperty("hdfs.parallelism.factor"));
int hdfsParallelism = availableCores * hdfsParallelismFactor / (hdfsParallelismFactor + 1);
builder.setBolt("split-count", new SplitCount(), availableCores - hdfsParallelism).localOrShuffleGrouping("spout");
String filenamePrefix = configure.getProperty("filename.prefix") + configure.getProperty("name") + "/";
HdfsBolt bolt = new HdfsBolt()
.withFsUrl(configure.getProperty("url"))
.withFileNameFormat(new DefaultFileNameFormat().withPrefix(filenamePrefix))
.withRecordFormat(new DelimitedRecordFormat().withFieldDelimiter(","))
.withSyncPolicy(new CountSyncPolicy(1000))
.withRotationPolicy(new NoRotationPolicy());
builder.setBolt("hdfs-bolt", bolt, hdfsParallelism).localOrShuffleGrouping("split-count");
}