public void onAdapterStarted()

in streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/iolink/IfmAlMqttAdapter.java [96:175]


  public void onAdapterStarted(IAdapterParameterExtractor extractor,
                               IEventCollector collector,
                               IAdapterRuntimeContext adapterRuntimeContext) throws AdapterException {
    var sensor = new SensorVVB001();

    this.applyConfiguration(extractor.getStaticPropertyExtractor());
    this.mqttConsumer = new MqttConsumer(
        this.mqttConfig,
        (mqttEvent) -> {
          try {
            InputStream in = convertByte(mqttEvent);
            parser.parse(in, (event) -> {

              var data = getMap(event, "data");
              var payload = getMap(data, "payload");

              var deviceInfo = getMap(payload, "/deviceinfo/serialnumber");
              var serialnumber = deviceInfo.get("data");

              for (int i = 0; i < ports.size(); i++) {

                String keyPortInformation = KEY_PORT_INFORMATION.formatted(ports.get(i));

                Map<String, Object> portResult;
                if (payload.containsKey(keyPortInformation)) {

                  portResult = getMap(payload, keyPortInformation);

                  try {
                    String eventData;
                    if (portResult.containsKey("data")) {
                      eventData = (String) portResult.get("data");

                      var parsedEvent = sensor.parseEvent(eventData);
                      parsedEvent.put("timestamp", System.currentTimeMillis() + i);
                      parsedEvent.put("port", "port" + ports.get(i));
                      parsedEvent.put(SensorVVB001.IO_LINK_MASTER_SN, serialnumber);

                      collector.collect(parsedEvent);
                    } else {
                      if (!missingEventDataDetected) {
                        adapterRuntimeContext
                                .getLogger()
                                .warn("Payload for port %s does not contain event data".formatted(i), "");
                        LOG.error(
                                "IoLink event does not look like expected. "
                                + "No port information found for port {}.", i);
                        missingEventDataDetected = true;
                      }
                    }
                  } catch (Exception e) {
                    adapterRuntimeContext
                            .getLogger()
                            .error(e);
                    LOG.error("Data from IOLink sensor could not be extracted for port {}: {}", i, e);
                  }

                } else {
                  if (!missingPortInformationDetected) {
                    adapterRuntimeContext
                            .getLogger()
                            .warn("Event does not contain information about port " + i, "");
                    LOG.error("IoLink event does not look like expected. No port information found for port {}.", i);
                    missingPortInformationDetected = true;
                  }
                }
              }
            });
          } catch (ParseException e) {
            adapterRuntimeContext
                    .getLogger()
                    .error(e);
            LOG.error("IOLink master event could not be parsed.", e);
          }
        }
        );

    Thread thread = new Thread(this.mqttConsumer);
    thread.start();
  }