protected void preStart()

in sandbox/monitoring/src/main/java/org/apache/brooklyn/entity/monitoring/zabbix/ZabbixFeed.java [318:457]


    protected void preStart() {
        final Supplier<URI> baseUriProvider = getConfig(BASE_URI_PROVIDER);
        final Function<? super EntityLocal, String> uniqueHostnameGenerator = getConfig(UNIQUE_HOSTNAME_GENERATOR);
        final Integer groupId = getConfig(GROUP_ID);
        final Integer templateId = getConfig(TEMPLATE_ID);
        final Set<ZabbixPollConfig<?>> polls = getConfig(POLLS);
        
        log.info("starting zabbix feed for {}", entity);

        // TODO if supplier returns null, we may wish to defer initialization until url available?
        // TODO for https should we really trust all?
        final HttpClient httpClient = HttpTool.httpClientBuilder()
                .trustAll()
                .clientConnectionManager(new ThreadSafeClientConnManager())
                .reuseStrategy(new NoConnectionReuseStrategy())
                .uri(baseUriProvider.get())
                .build();

        // Registration job, calls Zabbix host.create API
        final Callable<HttpToolResponse> registerJob = new Callable<HttpToolResponse>() {
            @Override
            public HttpToolResponse call() throws Exception {
                if (!registered.get()) {
                    // Find the first machine, if available
                    Optional<Location> location = Iterables.tryFind(entity.getLocations(), Predicates.instanceOf(MachineLocation.class));
                    if (!location.isPresent()) {
                        return null; // Do nothing until location is present
                    }
                    MachineLocation machine = (MachineLocation) location.get();

                    String host = uniqueHostnameGenerator.apply(entity);
                    
                    // Select address and port using port-forwarding if available
                    String address = entity.getAttribute(Attributes.ADDRESS);
                    Integer port = entity.getAttribute(ZabbixMonitored.ZABBIX_AGENT_PORT);
                    if (machine instanceof SupportsPortForwarding) {
                        Cidr management = entity.getConfig(BrooklynAccessUtils.MANAGEMENT_ACCESS_CIDR);
                        HostAndPort forwarded = ((SupportsPortForwarding) machine).getSocketEndpointFor(management, port);
                        address = forwarded.getHost();
                        port = forwarded.getPort();
                    }

                    // Fill in the JSON template and POST it
                    byte[] body = JSON_HOST_CREATE
                            .replace("{{token}}", entity.getConfig(ZabbixMonitored.ZABBIX_SERVER).getAttribute(ZabbixServer.ZABBIX_TOKEN))
                            .replace("{{host}}", host)
                            .replace("{{ip}}", address)
                            .replace("{{port}}", Integer.toString(port))
                            .replace("{{groupId}}", Integer.toString(groupId))
                            .replace("{{templateId}}", Integer.toString(templateId))
                            .replace("{{id}}", Integer.toString(id.incrementAndGet()))
                            .getBytes();
                    
                    return HttpTool.httpPost(httpClient, baseUriProvider.get(), ImmutableMap.of("Content-Type", "application/json"), body);
                }
                return null;
            }
        };

        // The handler for the registration job
        PollHandler<? super HttpToolResponse> registrationHandler = new PollHandler<HttpToolResponse>() {
            @Override
            public void onSuccess(HttpToolResponse val) {
                if (registered.get() || val == null) {
                    return; // Skip if we are registered already or no data from job
                }
                JsonObject response = HttpValueFunctions.jsonContents().apply(val).getAsJsonObject();
                if (response.has("error")) {
                    // Parse the JSON error object and log the message
                    JsonObject error = response.get("error").getAsJsonObject();
                    String message = error.get("message").getAsString();
                    String data = error.get("data").getAsString();
                    log.warn("zabbix failed registering host - {}: {}", message, data);
                } else if (response.has("result")) {
                    // Parse the JSON result object and save the hostId
                    JsonObject result = response.get("result").getAsJsonObject();
                    String hostId = result.get("hostids").getAsJsonArray().get(0).getAsString();
                    // Update the registered status if not set
                    if (registered.compareAndSet(false, true)) {
                        entity.sensors().set(ZabbixMonitored.ZABBIX_AGENT_HOSTID, hostId);
                        log.info("zabbix registered host as id {}", hostId);
                    }
                } else {
                    throw new IllegalStateException(String.format("zabbix host registration returned invalid result: %s", response.toString()));
                }
            }
            @Override
            public boolean checkSuccess(HttpToolResponse val) {
                return (val.getResponseCode() == 200);
            }
            @Override
            public void onFailure(HttpToolResponse val) {
                log.warn("zabbix sever returned failure code: {}", val.getResponseCode());
            }
            @Override
            public void onException(Exception exception) {
                log.warn("zabbix exception registering host", exception);
            }
            @Override
            public String toString() {
                return super.toString()+"["+getDescription()+"]";
            }
            @Override
            public String getDescription() {
                return "Zabbix rest poll";
            }
        };

        // Schedule registration attempt once per second
        getPoller().scheduleAtFixedRate(registerJob, registrationHandler, 1000l); // TODO make configurable

        // Create a polling job for each Zabbix metric
        for (final ZabbixPollConfig<?> config : polls) {
            Callable<HttpToolResponse> pollJob = new Callable<HttpToolResponse>() {
                @Override
                public HttpToolResponse call() throws Exception {
                    if (registered.get()) {
                        if (log.isTraceEnabled()) log.trace("zabbix polling {} for {}", entity, config);
                        byte[] body = JSON_ITEM_GET
                                .replace("{{token}}", entity.getConfig(ZabbixMonitored.ZABBIX_SERVER).getAttribute(ZabbixServer.ZABBIX_TOKEN))
                                .replace("{{hostId}}", entity.getAttribute(ZabbixMonitored.ZABBIX_AGENT_HOSTID))
                                .replace("{{itemKey}}", config.getItemKey())
                                .replace("{{id}}", Integer.toString(id.incrementAndGet()))
                                .getBytes();
                        
                        return HttpTool.httpPost(httpClient, baseUriProvider.get(), ImmutableMap.of("Content-Type", "application/json"), body);
                    } else {
                        throw new IllegalStateException("zabbix agent not yet registered");
                    }
                }
            };

            // Schedule the Zabbix polling job
            AttributePollHandler<? super HttpToolResponse> pollHandler = new AttributePollHandler<HttpToolResponse>(config, entity, this);
            long minPeriod = Integer.MAX_VALUE; // TODO make configurable
            if (config.getPeriod() > 0) minPeriod = Math.min(minPeriod, config.getPeriod());
            getPoller().scheduleAtFixedRate(pollJob, pollHandler, minPeriod);
        }

    }