in src/main/java/com/aliyun/emr/example/storm/benchmark/KafkaHdfs.java [23:40]
private IRichBolt getHdfsBolt() {
String filenamePrefix = configure.getProperty("filename.prefix") + configure.getProperty("name") + "/";
HdfsBolt bolt = new HdfsBolt()
.withFsUrl(configure.getProperty("url"))
.withFileNameFormat(new DefaultFileNameFormat().withPrefix(filenamePrefix))
.withRecordFormat(new RecordFormat() {
@Override
public byte[] format(Tuple tuple) {
String eventTime = ((Map<String, String>)tuple.getValue(0)).keySet().iterator().next();
String output = eventTime + "," + System.currentTimeMillis() + System.lineSeparator();
return output.getBytes();
}
})
.withSyncPolicy(new CountSyncPolicy(1000))
.withRotationPolicy(new NoRotationPolicy());
return bolt;
}