in software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/elasticsearch/ElasticSearchNodeImpl.java [70:119]
protected void connectSensors() {
super.connectSensors();
Integer rawPort = getAttribute(HTTP_PORT);
String hostname = getAttribute(HOSTNAME);
checkNotNull(rawPort, "HTTP_PORT sensors not set for %s; is an acceptable port available?", this);
HostAndPort hp = BrooklynAccessUtils.getBrooklynAccessibleAddress(this, rawPort);
Function<Maybe<JsonElement>, String> getNodeId = new Function<Maybe<JsonElement>, String>() {
@Override public String apply(Maybe<JsonElement> input) {
if (input.isAbsent()) {
return null;
}
return input.get().getAsJsonObject().entrySet().iterator().next().getKey();
}
};
sensors().set(DATASTORE_URL, String.format("http://%s", hp));
if (isHttpMonitoringEnabled()) {
boolean retrieveUsageMetrics = getConfig(RETRIEVE_USAGE_METRICS);
httpFeed = HttpFeed.builder()
.entity(this)
.period(1000)
.baseUri(String.format("http://%s:%s/_nodes/_local/stats", hp.getHost(), hp.getPort()))
.poll(new HttpPollConfig<Boolean>(SERVICE_UP)
.onSuccess(HttpValueFunctions.responseCodeEquals(200))
.onFailureOrException(Functions.constant(false)))
.poll(new HttpPollConfig<String>(NODE_ID)
.onSuccess(Functionals.chain(HttpValueFunctions.jsonContents(), MaybeFunctions.<JsonElement>wrap(), JsonFunctions.walkM("nodes"), getNodeId))
.onFailureOrException(Functions.constant("")))
.poll(new HttpPollConfig<String>(CLUSTER_NAME)
.onSuccess(HttpValueFunctions.jsonContents("cluster_name", String.class)))
.poll(getSensorFromNodeStat(NODE_NAME, "name"))
.poll(getSensorFromNodeStat(DOCUMENT_COUNT, "indices", "docs", "count")
.enabled(retrieveUsageMetrics))
.poll(getSensorFromNodeStat(STORE_BYTES, "indices", "store", "size_in_bytes")
.enabled(retrieveUsageMetrics))
.poll(getSensorFromNodeStat(GET_TOTAL, "indices", "get", "total")
.enabled(retrieveUsageMetrics))
.poll(getSensorFromNodeStat(GET_TIME_IN_MILLIS, "indices", "get", "time_in_millis")
.enabled(retrieveUsageMetrics))
.poll(getSensorFromNodeStat(SEARCH_QUERY_TOTAL, "indices", "search", "query_total")
.enabled(retrieveUsageMetrics))
.poll(getSensorFromNodeStat(SEARCH_QUERY_TIME_IN_MILLIS, "indices", "search", "query_time_in_millis")
.enabled(retrieveUsageMetrics))
.build();
} else {
connectServiceUpIsRunning();
}
}