in src/main/java/com/aliyun/emr/example/storm/benchmark/WindowedWordCount.java [55:73]
public void execute(TupleWindow inputWindow) {
for ( Tuple tuple : inputWindow.get()) {
Map<String, String> value = (Map)tuple.getValue(0);
for (Map.Entry<String, String> item : value.entrySet()) {
String eventTime = item.getKey();
String words = item.getValue();
for (String word: words.split("\\s+")) {
Integer number = counter.get(word);
if (number == null) {
number = 0;
}
number++;
counter.put(word, number);
}
collector.emit(new Values(eventTime, System.currentTimeMillis()));
}
}
}