in streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/iolink/IfmAlMqttAdapter.java [96:175]
public void onAdapterStarted(IAdapterParameterExtractor extractor,
IEventCollector collector,
IAdapterRuntimeContext adapterRuntimeContext) throws AdapterException {
var sensor = new SensorVVB001();
this.applyConfiguration(extractor.getStaticPropertyExtractor());
this.mqttConsumer = new MqttConsumer(
this.mqttConfig,
(mqttEvent) -> {
try {
InputStream in = convertByte(mqttEvent);
parser.parse(in, (event) -> {
var data = getMap(event, "data");
var payload = getMap(data, "payload");
var deviceInfo = getMap(payload, "/deviceinfo/serialnumber");
var serialnumber = deviceInfo.get("data");
for (int i = 0; i < ports.size(); i++) {
String keyPortInformation = KEY_PORT_INFORMATION.formatted(ports.get(i));
Map<String, Object> portResult;
if (payload.containsKey(keyPortInformation)) {
portResult = getMap(payload, keyPortInformation);
try {
String eventData;
if (portResult.containsKey("data")) {
eventData = (String) portResult.get("data");
var parsedEvent = sensor.parseEvent(eventData);
parsedEvent.put("timestamp", System.currentTimeMillis() + i);
parsedEvent.put("port", "port" + ports.get(i));
parsedEvent.put(SensorVVB001.IO_LINK_MASTER_SN, serialnumber);
collector.collect(parsedEvent);
} else {
if (!missingEventDataDetected) {
adapterRuntimeContext
.getLogger()
.warn("Payload for port %s does not contain event data".formatted(i), "");
LOG.error(
"IoLink event does not look like expected. "
+ "No port information found for port {}.", i);
missingEventDataDetected = true;
}
}
} catch (Exception e) {
adapterRuntimeContext
.getLogger()
.error(e);
LOG.error("Data from IOLink sensor could not be extracted for port {}: {}", i, e);
}
} else {
if (!missingPortInformationDetected) {
adapterRuntimeContext
.getLogger()
.warn("Event does not contain information about port " + i, "");
LOG.error("IoLink event does not look like expected. No port information found for port {}.", i);
missingPortInformationDetected = true;
}
}
}
});
} catch (ParseException e) {
adapterRuntimeContext
.getLogger()
.error(e);
LOG.error("IOLink master event could not be parsed.", e);
}
}
);
Thread thread = new Thread(this.mqttConsumer);
thread.start();
}