public void run()

in collector/elasticsearch/src/main/java/org/apache/karaf/decanter/collector/elasticsearch/ElasticsearchCollector.java [130:187]


    public void run() {
        SearchRequest.Builder searchRequestBuilder = new SearchRequest.Builder();

        String index = (config.get("index") != null) ? config.get("index").toString() : "decanter";
        searchRequestBuilder.index(index);

        String query = (config.get("query") != null) ? config.get("query").toString() : null;
        QueryBase queryBase;

        if (query == null) {
            searchRequestBuilder.q(QueryBuilders.matchAll().build().toString());
        } else {
            searchRequestBuilder.q(query);
        }

        String fromString = (config.get("from") != null) ? config.get("from").toString() : null;
        if (fromString != null) {
            int from = Integer.parseInt(fromString);
            searchRequestBuilder.from(from);
        }
        String sizeString = (config.get("size") != null) ? config.get("size").toString() : null;
        if (sizeString != null) {
            int size = Integer.parseInt(sizeString);
            searchRequestBuilder.size(size);
        }
        String timeoutString = (config.get("timeout") != null) ? config.get("timeout").toString() : null;
        if (timeoutString != null) {
            searchRequestBuilder.timeout(timeoutString);
        }

        Map<String, Object> data = new HashMap<>();
        data.put("type", "elasticsearch");
        try {
            ElasticsearchTransport transport = new RestClientTransport(restClient, new JacksonJsonpMapper());
            new ElasticsearchAsyncClient(transport).search(searchRequestBuilder.build(), Map.class)
                    .thenApply((response) -> {
                        data.put("timeout", response.timedOut());
                        data.put("totalShards", response.shards().total().intValue());
                        data.put("successfulShards", response.shards().successful().intValue());
                        data.put("failedShards", response.shards().failed().intValue());
                        for (Hit hit : response.hits().hits()) {
                            data.putAll(hit.fields());
                        }
                        return null;
                    });
        } catch (Exception e) {
            LOGGER.error("Can't query elasticsearch", e);
        }
        try {
            PropertiesPreparator.prepare(data, config);
        } catch (Exception e) {
            LOGGER.warn("Can't prepare event", e);
        }

        String topic = (config.get(EventConstants.EVENT_TOPIC) != null) ? (String) config.get(EventConstants.EVENT_TOPIC) : "decanter/collect/elasticsearch";

        dispatcher.postEvent(new Event(topic, data));
    }