public void run()

in processor/groupby/src/main/java/org/apache/karaf/decanter/processor/groupby/GroupByProcessor.java [95:125]


        public void run() {
            if (accumulation.size() > 0) {
                for (Integer hash : accumulation.keySet()) {
                    Map merge = new HashMap();
                    merge.put("processor", "groupBy");
                    if (flat) {
                        for (Event event : accumulation.get(hash)) {
                            for (String propertyName : event.getPropertyNames()) {
                                merge.put(propertyName, event.getProperty(propertyName));
                            }
                        }
                    } else {
                        List<Map<String, Object>> events = new ArrayList<>();
                        merge.put("events", events);
                        for (Event event : accumulation.get(hash)) {
                            Map<String,Object> properties = new HashMap<>();
                            for (String propertyName : event.getPropertyNames()) {
                                properties.put(propertyName, event.getProperty(propertyName));
                            }
                            events.add(properties);
                        }
                    }
                    // send event
                    String[] topics = targetTopics.split(",");
                    for (String topic : topics) {
                        dispatcher.postEvent(new Event(topic, merge));
                    }
                }
                accumulation.clear();
            }
        }