in collector/mqtt/src/main/java/org/apache/karaf/decanter/collector/mqtt/MqttCollector.java [63:119]
public void activate(ComponentContext componentContext) throws Exception {
properties = componentContext.getProperties();
String serverUri = getProperty(properties, "server.uri", "tcp://localhost:1883");
String clientId = getProperty(properties, "client.id", "d:decanter:collector:default");
String topic = getProperty(properties, "topic", "decanter");
String username = getProperty(properties, "userName", null);
String password = getProperty(properties, "password", null);
dispatcherTopic = getProperty(properties, EventConstants.EVENT_TOPIC, "decanter/collect/mqtt/decanter");
client = new MqttClient(serverUri, clientId);
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(true);
if (username != null) {
options.setUserName(username);
}
if (password != null) {
options.setPassword(password.toCharArray());
}
options.setConnectionTimeout(60);
options.setAutomaticReconnect(true);
options.setKeepAliveInterval(10);
options.setExecutorServiceTimeout(30);
client.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
LOGGER.debug("MQTT connection lost", cause);
}
@Override
public void messageArrived(String topic, MqttMessage message) {
if (message.getPayload() == null) {
return;
}
Map<String, Object> data = new HashMap<>();
data.put("type", "mqtt");
ByteArrayInputStream is = new ByteArrayInputStream(message.getPayload());
data.putAll(unmarshaller.unmarshal(is));
try {
PropertiesPreparator.prepare(data, properties);
} catch (Exception e) {
LOGGER.warn("Can't prepare data for the dispatcher", e);
}
Event event = new Event(dispatcherTopic, data);
dispatcher.postEvent(event);
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
// nothing to do
}
});
client.connect(options);
client.subscribe(topic);
}