protected void setBolt()

in src/main/java/com/aliyun/emr/example/storm/benchmark/WindowedWordCount.java [24:42]


    protected void setBolt(TopologyBuilder builder) {
        int windowLength = Integer.valueOf(configure.getProperty("window.length"));
        int clusterCores = Integer.valueOf(configure.getProperty("cluster.cores.total"));
        int availableCores = clusterCores - Integer.valueOf(configure.getProperty("partition.number"));
        int parallelism =  availableCores / 2;

        int slidingInterval = Integer.valueOf(configure.getProperty("slide.interval"));

        builder.setBolt("count", new SplitCount().withWindow(new Count(windowLength), new Count(slidingInterval)), parallelism).localOrShuffleGrouping("spout");

        String filenamePrefix = configure.getProperty("filename.prefix") + configure.getProperty("name") + "/";
        HdfsBolt bolt = new HdfsBolt()
            .withFsUrl(configure.getProperty("url"))
            .withFileNameFormat(new DefaultFileNameFormat().withPrefix(filenamePrefix))
            .withRecordFormat(new DelimitedRecordFormat().withFieldDelimiter(","))
            .withSyncPolicy(new CountSyncPolicy(1000))
            .withRotationPolicy(new NoRotationPolicy());
        builder.setBolt("hdfs-bolt", bolt, parallelism).localOrShuffleGrouping("count");
    }