in src/main/java/com/aliyun/emr/example/storm/benchmark/WordCount.java [44:61]
public void execute(Tuple input, BasicOutputCollector collector) {
Map<String, String> value = (Map<String, String>)input.getValue(0);
for (Map.Entry<String, String>item : value.entrySet()) {
String eventTime = item.getKey();
String words = item.getValue();
for (String word : words.split("\\s+")) {
Integer number = counter.get(word);
if (number == null) {
number = 0;
}
number++;
counter.put(word, number);
}
collector.emit(new Values(eventTime, System.currentTimeMillis()));
}
}