in collector/elasticsearch/src/main/java/org/apache/karaf/decanter/collector/elasticsearch/ElasticsearchCollector.java [130:187]
public void run() {
SearchRequest.Builder searchRequestBuilder = new SearchRequest.Builder();
String index = (config.get("index") != null) ? config.get("index").toString() : "decanter";
searchRequestBuilder.index(index);
String query = (config.get("query") != null) ? config.get("query").toString() : null;
QueryBase queryBase;
if (query == null) {
searchRequestBuilder.q(QueryBuilders.matchAll().build().toString());
} else {
searchRequestBuilder.q(query);
}
String fromString = (config.get("from") != null) ? config.get("from").toString() : null;
if (fromString != null) {
int from = Integer.parseInt(fromString);
searchRequestBuilder.from(from);
}
String sizeString = (config.get("size") != null) ? config.get("size").toString() : null;
if (sizeString != null) {
int size = Integer.parseInt(sizeString);
searchRequestBuilder.size(size);
}
String timeoutString = (config.get("timeout") != null) ? config.get("timeout").toString() : null;
if (timeoutString != null) {
searchRequestBuilder.timeout(timeoutString);
}
Map<String, Object> data = new HashMap<>();
data.put("type", "elasticsearch");
try {
ElasticsearchTransport transport = new RestClientTransport(restClient, new JacksonJsonpMapper());
new ElasticsearchAsyncClient(transport).search(searchRequestBuilder.build(), Map.class)
.thenApply((response) -> {
data.put("timeout", response.timedOut());
data.put("totalShards", response.shards().total().intValue());
data.put("successfulShards", response.shards().successful().intValue());
data.put("failedShards", response.shards().failed().intValue());
for (Hit hit : response.hits().hits()) {
data.putAll(hit.fields());
}
return null;
});
} catch (Exception e) {
LOGGER.error("Can't query elasticsearch", e);
}
try {
PropertiesPreparator.prepare(data, config);
} catch (Exception e) {
LOGGER.warn("Can't prepare event", e);
}
String topic = (config.get(EventConstants.EVENT_TOPIC) != null) ? (String) config.get(EventConstants.EVENT_TOPIC) : "decanter/collect/elasticsearch";
dispatcher.postEvent(new Event(topic, data));
}