protected void setBolt()

in src/main/java/com/aliyun/emr/example/storm/benchmark/WordCount.java [21:37]


    protected void setBolt(TopologyBuilder builder) {
        int clusterCores = Integer.valueOf(configure.getProperty("cluster.cores.total"));
        int availableCores = clusterCores - Integer.valueOf(configure.getProperty("partition.number"));

        int hdfsParallelismFactor =  Integer.parseInt(configure.getProperty("hdfs.parallelism.factor"));
        int hdfsParallelism = availableCores * hdfsParallelismFactor / (hdfsParallelismFactor + 1);
        builder.setBolt("split-count", new SplitCount(), availableCores - hdfsParallelism).localOrShuffleGrouping("spout");

        String filenamePrefix = configure.getProperty("filename.prefix") + configure.getProperty("name") + "/";
        HdfsBolt bolt = new HdfsBolt()
            .withFsUrl(configure.getProperty("url"))
            .withFileNameFormat(new DefaultFileNameFormat().withPrefix(filenamePrefix))
            .withRecordFormat(new DelimitedRecordFormat().withFieldDelimiter(","))
            .withSyncPolicy(new CountSyncPolicy(1000))
            .withRotationPolicy(new NoRotationPolicy());
        builder.setBolt("hdfs-bolt", bolt, hdfsParallelism).localOrShuffleGrouping("split-count");
    }