in src/main/java/com/aliyun/emr/example/storm/benchmark/WindowedWordCount.java [24:42]
protected void setBolt(TopologyBuilder builder) {
int windowLength = Integer.valueOf(configure.getProperty("window.length"));
int clusterCores = Integer.valueOf(configure.getProperty("cluster.cores.total"));
int availableCores = clusterCores - Integer.valueOf(configure.getProperty("partition.number"));
int parallelism = availableCores / 2;
int slidingInterval = Integer.valueOf(configure.getProperty("slide.interval"));
builder.setBolt("count", new SplitCount().withWindow(new Count(windowLength), new Count(slidingInterval)), parallelism).localOrShuffleGrouping("spout");
String filenamePrefix = configure.getProperty("filename.prefix") + configure.getProperty("name") + "/";
HdfsBolt bolt = new HdfsBolt()
.withFsUrl(configure.getProperty("url"))
.withFileNameFormat(new DefaultFileNameFormat().withPrefix(filenamePrefix))
.withRecordFormat(new DelimitedRecordFormat().withFieldDelimiter(","))
.withSyncPolicy(new CountSyncPolicy(1000))
.withRotationPolicy(new NoRotationPolicy());
builder.setBolt("hdfs-bolt", bolt, parallelism).localOrShuffleGrouping("count");
}