in brooklyn-library/sandbox/monitoring/src/main/java/org/apache/brooklyn/entity/monitoring/zabbix/ZabbixFeed.java [318:457]
protected void preStart() {
final Supplier<URI> baseUriProvider = getConfig(BASE_URI_PROVIDER);
final Function<? super EntityLocal, String> uniqueHostnameGenerator = getConfig(UNIQUE_HOSTNAME_GENERATOR);
final Integer groupId = getConfig(GROUP_ID);
final Integer templateId = getConfig(TEMPLATE_ID);
final Set<ZabbixPollConfig<?>> polls = getConfig(POLLS);
log.info("starting zabbix feed for {}", entity);
// TODO if supplier returns null, we may wish to defer initialization until url available?
// TODO for https should we really trust all?
final HttpClient httpClient = HttpTool.httpClientBuilder()
.trustAll()
.clientConnectionManager(new ThreadSafeClientConnManager())
.reuseStrategy(new NoConnectionReuseStrategy())
.uri(baseUriProvider.get())
.build();
// Registration job, calls Zabbix host.create API
final Callable<HttpToolResponse> registerJob = new Callable<HttpToolResponse>() {
@Override
public HttpToolResponse call() throws Exception {
if (!registered.get()) {
// Find the first machine, if available
Optional<Location> location = Iterables.tryFind(entity.getLocations(), Predicates.instanceOf(MachineLocation.class));
if (!location.isPresent()) {
return null; // Do nothing until location is present
}
MachineLocation machine = (MachineLocation) location.get();
String host = uniqueHostnameGenerator.apply(entity);
// Select address and port using port-forwarding if available
String address = entity.getAttribute(Attributes.ADDRESS);
Integer port = entity.getAttribute(ZabbixMonitored.ZABBIX_AGENT_PORT);
if (machine instanceof SupportsPortForwarding) {
Cidr management = entity.getConfig(BrooklynAccessUtils.MANAGEMENT_ACCESS_CIDR);
HostAndPort forwarded = ((SupportsPortForwarding) machine).getSocketEndpointFor(management, port);
address = forwarded.getHostText();
port = forwarded.getPort();
}
// Fill in the JSON template and POST it
byte[] body = JSON_HOST_CREATE
.replace("{{token}}", entity.getConfig(ZabbixMonitored.ZABBIX_SERVER).getAttribute(ZabbixServer.ZABBIX_TOKEN))
.replace("{{host}}", host)
.replace("{{ip}}", address)
.replace("{{port}}", Integer.toString(port))
.replace("{{groupId}}", Integer.toString(groupId))
.replace("{{templateId}}", Integer.toString(templateId))
.replace("{{id}}", Integer.toString(id.incrementAndGet()))
.getBytes();
return HttpTool.httpPost(httpClient, baseUriProvider.get(), ImmutableMap.of("Content-Type", "application/json"), body);
}
return null;
}
};
// The handler for the registration job
PollHandler<? super HttpToolResponse> registrationHandler = new PollHandler<HttpToolResponse>() {
@Override
public void onSuccess(HttpToolResponse val) {
if (registered.get() || val == null) {
return; // Skip if we are registered already or no data from job
}
JsonObject response = HttpValueFunctions.jsonContents().apply(val).getAsJsonObject();
if (response.has("error")) {
// Parse the JSON error object and log the message
JsonObject error = response.get("error").getAsJsonObject();
String message = error.get("message").getAsString();
String data = error.get("data").getAsString();
log.warn("zabbix failed registering host - {}: {}", message, data);
} else if (response.has("result")) {
// Parse the JSON result object and save the hostId
JsonObject result = response.get("result").getAsJsonObject();
String hostId = result.get("hostids").getAsJsonArray().get(0).getAsString();
// Update the registered status if not set
if (registered.compareAndSet(false, true)) {
entity.sensors().set(ZabbixMonitored.ZABBIX_AGENT_HOSTID, hostId);
log.info("zabbix registered host as id {}", hostId);
}
} else {
throw new IllegalStateException(String.format("zabbix host registration returned invalid result: %s", response.toString()));
}
}
@Override
public boolean checkSuccess(HttpToolResponse val) {
return (val.getResponseCode() == 200);
}
@Override
public void onFailure(HttpToolResponse val) {
log.warn("zabbix sever returned failure code: {}", val.getResponseCode());
}
@Override
public void onException(Exception exception) {
log.warn("zabbix exception registering host", exception);
}
@Override
public String toString() {
return super.toString()+"["+getDescription()+"]";
}
@Override
public String getDescription() {
return "Zabbix rest poll";
}
};
// Schedule registration attempt once per second
getPoller().scheduleAtFixedRate(registerJob, registrationHandler, 1000l); // TODO make configurable
// Create a polling job for each Zabbix metric
for (final ZabbixPollConfig<?> config : polls) {
Callable<HttpToolResponse> pollJob = new Callable<HttpToolResponse>() {
@Override
public HttpToolResponse call() throws Exception {
if (registered.get()) {
if (log.isTraceEnabled()) log.trace("zabbix polling {} for {}", entity, config);
byte[] body = JSON_ITEM_GET
.replace("{{token}}", entity.getConfig(ZabbixMonitored.ZABBIX_SERVER).getAttribute(ZabbixServer.ZABBIX_TOKEN))
.replace("{{hostId}}", entity.getAttribute(ZabbixMonitored.ZABBIX_AGENT_HOSTID))
.replace("{{itemKey}}", config.getItemKey())
.replace("{{id}}", Integer.toString(id.incrementAndGet()))
.getBytes();
return HttpTool.httpPost(httpClient, baseUriProvider.get(), ImmutableMap.of("Content-Type", "application/json"), body);
} else {
throw new IllegalStateException("zabbix agent not yet registered");
}
}
};
// Schedule the Zabbix polling job
AttributePollHandler<? super HttpToolResponse> pollHandler = new AttributePollHandler<HttpToolResponse>(config, entity, this);
long minPeriod = Integer.MAX_VALUE; // TODO make configurable
if (config.getPeriod() > 0) minPeriod = Math.min(minPeriod, config.getPeriod());
getPoller().scheduleAtFixedRate(pollJob, pollHandler, minPeriod);
}
}