private IRichBolt getHdfsBolt()

in src/main/java/com/aliyun/emr/example/storm/benchmark/KafkaHdfs.java [23:40]


    private IRichBolt getHdfsBolt() {

        String filenamePrefix = configure.getProperty("filename.prefix") + configure.getProperty("name") + "/";
        HdfsBolt bolt = new HdfsBolt()
            .withFsUrl(configure.getProperty("url"))
            .withFileNameFormat(new DefaultFileNameFormat().withPrefix(filenamePrefix))
            .withRecordFormat(new RecordFormat() {
                @Override
                public byte[] format(Tuple tuple) {
                    String eventTime = ((Map<String, String>)tuple.getValue(0)).keySet().iterator().next();
                    String output = eventTime + "," + System.currentTimeMillis() + System.lineSeparator();
                    return output.getBytes();
                }
            })
            .withSyncPolicy(new CountSyncPolicy(1000))
            .withRotationPolicy(new NoRotationPolicy());
        return bolt;
    }