in src/main/java/com/aliyun/emr/example/storm/benchmark/TridentWordCount.java [68:86]
public void execute(TridentTuple tuple, TridentCollector collector) {
// for test
Map<String, String> kv = (Map<String, String>)tuple.get(0);
for (Map.Entry<String, String> item: kv.entrySet()) {
String eventTime = item.getKey();
String words = item.getValue();
for (String word: words.split("\\s+")) {
Integer number = count.get(word);
if (number == null) {
number = 0;
}
number++;
count.put(word, number);
}
collector.emit(new Values(eventTime, System.currentTimeMillis()));
}
}