public void activate()

in collector/mqtt/src/main/java/org/apache/karaf/decanter/collector/mqtt/MqttCollector.java [63:119]


    public void activate(ComponentContext componentContext) throws Exception {
        properties = componentContext.getProperties();
        String serverUri = getProperty(properties, "server.uri", "tcp://localhost:1883");
        String clientId = getProperty(properties, "client.id", "d:decanter:collector:default");
        String topic = getProperty(properties, "topic", "decanter");
        String username = getProperty(properties, "userName", null);
        String password = getProperty(properties, "password", null);
        dispatcherTopic = getProperty(properties, EventConstants.EVENT_TOPIC, "decanter/collect/mqtt/decanter");
        client = new MqttClient(serverUri, clientId);
        MqttConnectOptions options = new MqttConnectOptions();
        options.setCleanSession(true);
        if (username != null) {
            options.setUserName(username);
        }
        if (password != null) {
            options.setPassword(password.toCharArray());
        }
        options.setConnectionTimeout(60);
        options.setAutomaticReconnect(true);
        options.setKeepAliveInterval(10);
        options.setExecutorServiceTimeout(30);
        client.setCallback(new MqttCallback() {
            @Override
            public void connectionLost(Throwable cause) {
                LOGGER.debug("MQTT connection lost", cause);
            }

            @Override
            public void messageArrived(String topic, MqttMessage message) {
                if (message.getPayload() == null) {
                    return;
                }

                Map<String, Object> data = new HashMap<>();
                data.put("type", "mqtt");

                ByteArrayInputStream is = new ByteArrayInputStream(message.getPayload());
                data.putAll(unmarshaller.unmarshal(is));

                try {
                    PropertiesPreparator.prepare(data, properties);
                } catch (Exception e) {
                    LOGGER.warn("Can't prepare data for the dispatcher", e);
                }

                Event event = new Event(dispatcherTopic, data);
                dispatcher.postEvent(event);
            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken token) {
                // nothing to do
            }
        });
        client.connect(options);
        client.subscribe(topic);
    }