void PutOPCProcessor::onTrigger()

in extensions/opc/src/putopc.cpp [77:331]


  void PutOPCProcessor::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
    logger_->log_trace("PutOPCProcessor::onTrigger");

    if (!reconnect()) {
      yield();
      return;
    }

    if (!parentExists_) {
      if (idType_ == opc::OPCNodeIDType::Path) {
        std::vector<UA_NodeId> translatedNodeIDs;
        if (connection_->translateBrowsePathsToNodeIdsRequest(nodeID_, translatedNodeIDs, logger_) !=
            UA_STATUSCODE_GOOD) {
          logger_->log_error("Failed to translate %s to node id, no flow files will be put", nodeID_.c_str());
          yield();
          return;
        } else if (translatedNodeIDs.size() != 1) {
          logger_->log_error("%s was translated to multiple node ids, no flow files will be put", nodeID_.c_str());
          yield();
          return;
        } else {
          parentNodeID_ = translatedNodeIDs[0];
          parentExists_ = true;
        }
      } else {
        parentNodeID_.namespaceIndex = nameSpaceIdx_;
        if (idType_ == opc::OPCNodeIDType::Int) {
          parentNodeID_.identifierType = UA_NODEIDTYPE_NUMERIC;
          parentNodeID_.identifier.numeric = std::stoi(nodeID_);
        } else {  // idType_ == opc::OPCNodeIDType::String
          parentNodeID_.identifierType = UA_NODEIDTYPE_STRING;
          parentNodeID_.identifier.string = UA_STRING_ALLOC(nodeID_.c_str());
        }
        if (!connection_->exists(parentNodeID_)) {
          logger_->log_error("Parent node doesn't exist, no flow files will be put");
          yield();
          return;
        }
        parentExists_ = true;
      }
    }

    auto flowFile = session->get();

    // Do nothing if there are no incoming files
    if (!flowFile) {
      return;
    }

    std::string targetidtype;

    bool targetNodeExists = false;
    bool targetNodeValid = false;
    UA_NodeId targetnode;

    if (context->getProperty(TargetNodeIDType, targetidtype, flowFile)) {
      std::string targetid;
      std::string namespaceidx;


      if (!context->getProperty(TargetNodeID, targetid, flowFile)) {
        logger_->log_error("Flowfile %s had target node ID type specified (%s) without ID, routing to failure!",
                           flowFile->getUUIDStr(), targetidtype);
        session->transfer(flowFile, Failure);
        return;
      }

      if (!context->getProperty(TargetNodeNameSpaceIndex, namespaceidx, flowFile)) {
        logger_->log_error(
            "Flowfile %s had target node ID type specified (%s) without namespace index, routing to failure!",
            flowFile->getUUIDStr(), targetidtype);
        session->transfer(flowFile, Failure);
        return;
      }
      int32_t nsi;
      try {
        nsi = std::stoi(namespaceidx);
      } catch (...) {
        logger_->log_error("Flowfile %s has invalid namespace index (%s), routing to failure!",
                           flowFile->getUUIDStr(), namespaceidx);
        session->transfer(flowFile, Failure);
        return;
      }

      targetnode.namespaceIndex = nsi;
      if (targetidtype == "Int") {
        targetnode.identifierType = UA_NODEIDTYPE_NUMERIC;
        try {
          targetnode.identifier.numeric = std::stoi(targetid);
          targetNodeValid = true;
        } catch (...) {
          logger_->log_error("Flowfile %s: target node ID is not a valid integer: %s. Routing to failure!",
                             flowFile->getUUIDStr(), targetid);
          session->transfer(flowFile, Failure);
          return;
        }
      } else if (targetidtype == "String") {
        targetnode.identifierType = UA_NODEIDTYPE_STRING;
        targetnode.identifier.string = UA_STRING_ALLOC(targetid.c_str());
        targetNodeValid = true;
      } else {
        logger_->log_error("Flowfile %s: target node ID type is invalid: %s. Routing to failure!",
                           flowFile->getUUIDStr(), targetidtype);
        session->transfer(flowFile, Failure);
        return;
      }
      targetNodeExists = connection_->exists(targetnode);
    }

    const auto contentstr = to_string(session->readBuffer(flowFile));
    if (targetNodeExists) {
      logger_->log_trace("Node exists, trying to update it");
      try {
        UA_StatusCode sc;
        switch (nodeDataType_) {
          case opc::OPCNodeDataType::Int64: {
            int64_t value = std::stoll(contentstr);
            sc = connection_->update_node(targetnode, value);
            break;
          }
          case opc::OPCNodeDataType::UInt64: {
            uint64_t value = std::stoull(contentstr);
            sc = connection_->update_node(targetnode, value);
            break;
          }
          case opc::OPCNodeDataType::Int32: {
            int32_t value = std::stoi(contentstr);
            sc = connection_->update_node(targetnode, value);
            break;
          }
          case opc::OPCNodeDataType::UInt32: {
            uint32_t value = std::stoul(contentstr);
            sc = connection_->update_node(targetnode, value);
            break;
          }
          case opc::OPCNodeDataType::Boolean: {
            const auto contentstr_parsed = utils::StringUtils::toBool(contentstr);
            if (contentstr_parsed) {
              sc = connection_->update_node(targetnode, contentstr_parsed.value());
            } else {
              throw opc::OPCException(GENERAL_EXCEPTION, "Content cannot be converted to bool");
            }
            break;
          }
          case opc::OPCNodeDataType::Float: {
            float value = std::stof(contentstr);
            sc = connection_->update_node(targetnode, value);
            break;
          }
          case opc::OPCNodeDataType::Double: {
            double value = std::stod(contentstr);
            sc = connection_->update_node(targetnode, value);
            break;
          }
          case opc::OPCNodeDataType::String: {
            sc = connection_->update_node(targetnode, contentstr);
            break;
          }
          default:
            throw opc::OPCException(GENERAL_EXCEPTION, "This should never happen!");
        }
        if (sc != UA_STATUSCODE_GOOD) {
          logger_->log_error("Failed to update node: %s", UA_StatusCode_name(sc));
          session->transfer(flowFile, Failure);
          return;
        }
      } catch (...) {
        std::string typestr;
        context->getProperty(ValueType, typestr);
        logger_->log_error("Failed to convert %s to data type %s", contentstr, typestr);
        session->transfer(flowFile, Failure);
        return;
      }
      logger_->log_trace("Node successfully updated!");
      session->transfer(flowFile, Success);
      return;
    } else {
      logger_->log_trace("Node doesn't exist, trying to create new node");
      std::string browsename;
      if (!context->getProperty(TargetNodeBrowseName, browsename, flowFile)) {
        logger_->log_error("Target node browse name is required for flowfile (%s) as new node is to be created",
                           flowFile->getUUIDStr());
        session->transfer(flowFile, Failure);
        return;
      }
      if (!targetNodeValid) {
        targetnode = UA_NODEID_NUMERIC(1, 0);
      }
      try {
        UA_StatusCode sc;
        UA_NodeId resultnode;
        switch (nodeDataType_) {
          case opc::OPCNodeDataType::Int64: {
            int64_t value = std::stoll(contentstr);
            sc = connection_->add_node(parentNodeID_, targetnode, browsename, value, &resultnode);
            break;
          }
          case opc::OPCNodeDataType::UInt64: {
            uint64_t value = std::stoull(contentstr);
            sc = connection_->add_node(parentNodeID_, targetnode, browsename, value, &resultnode);
            break;
          }
          case opc::OPCNodeDataType::Int32: {
            int32_t value = std::stoi(contentstr);
            sc = connection_->add_node(parentNodeID_, targetnode, browsename, value, &resultnode);
            break;
          }
          case opc::OPCNodeDataType::UInt32: {
            uint32_t value = std::stoul(contentstr);
            sc = connection_->add_node(parentNodeID_, targetnode, browsename, value, &resultnode);
            break;
          }
          case opc::OPCNodeDataType::Boolean: {
            const auto contentstr_parsed = utils::StringUtils::toBool(contentstr);
            if (contentstr_parsed) {
              sc = connection_->add_node(parentNodeID_, targetnode, browsename, contentstr_parsed.value(), &resultnode);
            } else {
              throw opc::OPCException(GENERAL_EXCEPTION, "Content cannot be converted to bool");
            }
            break;
          }
          case opc::OPCNodeDataType::Float: {
            float value = std::stof(contentstr);
            sc = connection_->add_node(parentNodeID_, targetnode, browsename, value, &resultnode);
            break;
          }
          case opc::OPCNodeDataType::Double: {
            double value = std::stod(contentstr);
            sc = connection_->add_node(parentNodeID_, targetnode, browsename, value, &resultnode);
            break;
          }
          case opc::OPCNodeDataType::String: {
            sc = connection_->add_node(parentNodeID_, targetnode, browsename, contentstr, &resultnode);
            break;
          }
          default:
            throw opc::OPCException(GENERAL_EXCEPTION, "This should never happen!");
        }
        if (sc != UA_STATUSCODE_GOOD) {
          logger_->log_error("Failed to create node: %s", UA_StatusCode_name(sc));
          session->transfer(flowFile, Failure);
          return;
        }
      } catch (...) {
        std::string typestr;
        context->getProperty(ValueType, typestr);
        logger_->log_error("Failed to convert %s to data type %s", contentstr, typestr);
        session->transfer(flowFile, Failure);
        return;
      }
      logger_->log_trace("Node successfully created!");
      session->transfer(flowFile, Success);
      return;
    }
  }