public void pullData()

in streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/modbus/Plc4xModbusAdapter.java [196:252]


  public void pullData() {

    // create PLC read request
    PlcReadRequest.Builder builder = plcConnection.readRequestBuilder();
    for (Map<String, String> node : this.nodes) {

      switch (node.get(PLC_NODE_TYPE)) {
        case "Coil" ->
            builder.addTagAddress(node.get(PLC_NODE_RUNTIME_NAME), "coil:" + String.valueOf(node.get(PLC_NODE_ADDRESS)));
        case "HoldingRegister" -> builder.addTagAddress(node.get(PLC_NODE_RUNTIME_NAME),
            "holding-register:" + String.valueOf(node.get(PLC_NODE_ADDRESS)));
        case "DiscreteInput" -> builder.addTagAddress(node.get(PLC_NODE_RUNTIME_NAME),
            "discrete-input:" + String.valueOf(node.get(PLC_NODE_ADDRESS)));
        case "InputRegister" -> builder.addTagAddress(node.get(PLC_NODE_RUNTIME_NAME),
            "input-register:" + String.valueOf(node.get(PLC_NODE_ADDRESS)));
      }
    }
    PlcReadRequest readRequest = builder.build();

    // Execute the request
    PlcReadResponse response = null;

    try {
      response = readRequest.execute().get();
    } catch (InterruptedException | ExecutionException ie) {
      ie.printStackTrace();
    }

    // Create an event containing the value of the PLC
    Map<String, Object> event = new HashMap<>();
    for (Map<String, String> node : this.nodes) {

      if (response.getResponseCode(node.get(PLC_NODE_RUNTIME_NAME)) == PlcResponseCode.OK) {

        switch (node.get(PLC_NODE_TYPE)) {
          case "Coil":
            event.put(node.get(PLC_NODE_RUNTIME_NAME), response.getBoolean(node.get(PLC_NODE_RUNTIME_NAME)));
            break;
          case "DiscreteInput":
            event.put(node.get(PLC_NODE_RUNTIME_NAME), response.getBoolean(node.get(PLC_NODE_RUNTIME_NAME)));
            break;
          case "InputRegister":
            event.put(node.get(PLC_NODE_RUNTIME_NAME), response.getInteger(node.get(PLC_NODE_RUNTIME_NAME)));
            break;
          case "HoldingRegister":
            event.put(node.get(PLC_NODE_RUNTIME_NAME), response.getInteger(node.get(PLC_NODE_RUNTIME_NAME)));
            break;
        }
      } else {
        LOG.error("Error[" + node.get(PLC_NODE_RUNTIME_NAME) + "]: "
                  + response.getResponseCode(node.get(PLC_NODE_RUNTIME_NAME)));
      }
    }

    // publish the final event
    collector.collect(event);
  }