in processor/groupby/src/main/java/org/apache/karaf/decanter/processor/groupby/GroupByProcessor.java [95:125]
public void run() {
if (accumulation.size() > 0) {
for (Integer hash : accumulation.keySet()) {
Map merge = new HashMap();
merge.put("processor", "groupBy");
if (flat) {
for (Event event : accumulation.get(hash)) {
for (String propertyName : event.getPropertyNames()) {
merge.put(propertyName, event.getProperty(propertyName));
}
}
} else {
List<Map<String, Object>> events = new ArrayList<>();
merge.put("events", events);
for (Event event : accumulation.get(hash)) {
Map<String,Object> properties = new HashMap<>();
for (String propertyName : event.getPropertyNames()) {
properties.put(propertyName, event.getProperty(propertyName));
}
events.add(properties);
}
}
// send event
String[] topics = targetTopics.split(",");
for (String topic : topics) {
dispatcher.postEvent(new Event(topic, merge));
}
}
accumulation.clear();
}
}