in extensions/opc/src/putopc.cpp [77:331]
void PutOPCProcessor::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
logger_->log_trace("PutOPCProcessor::onTrigger");
if (!reconnect()) {
yield();
return;
}
if (!parentExists_) {
if (idType_ == opc::OPCNodeIDType::Path) {
std::vector<UA_NodeId> translatedNodeIDs;
if (connection_->translateBrowsePathsToNodeIdsRequest(nodeID_, translatedNodeIDs, logger_) !=
UA_STATUSCODE_GOOD) {
logger_->log_error("Failed to translate %s to node id, no flow files will be put", nodeID_.c_str());
yield();
return;
} else if (translatedNodeIDs.size() != 1) {
logger_->log_error("%s was translated to multiple node ids, no flow files will be put", nodeID_.c_str());
yield();
return;
} else {
parentNodeID_ = translatedNodeIDs[0];
parentExists_ = true;
}
} else {
parentNodeID_.namespaceIndex = nameSpaceIdx_;
if (idType_ == opc::OPCNodeIDType::Int) {
parentNodeID_.identifierType = UA_NODEIDTYPE_NUMERIC;
parentNodeID_.identifier.numeric = std::stoi(nodeID_);
} else { // idType_ == opc::OPCNodeIDType::String
parentNodeID_.identifierType = UA_NODEIDTYPE_STRING;
parentNodeID_.identifier.string = UA_STRING_ALLOC(nodeID_.c_str());
}
if (!connection_->exists(parentNodeID_)) {
logger_->log_error("Parent node doesn't exist, no flow files will be put");
yield();
return;
}
parentExists_ = true;
}
}
auto flowFile = session->get();
// Do nothing if there are no incoming files
if (!flowFile) {
return;
}
std::string targetidtype;
bool targetNodeExists = false;
bool targetNodeValid = false;
UA_NodeId targetnode;
if (context->getProperty(TargetNodeIDType, targetidtype, flowFile)) {
std::string targetid;
std::string namespaceidx;
if (!context->getProperty(TargetNodeID, targetid, flowFile)) {
logger_->log_error("Flowfile %s had target node ID type specified (%s) without ID, routing to failure!",
flowFile->getUUIDStr(), targetidtype);
session->transfer(flowFile, Failure);
return;
}
if (!context->getProperty(TargetNodeNameSpaceIndex, namespaceidx, flowFile)) {
logger_->log_error(
"Flowfile %s had target node ID type specified (%s) without namespace index, routing to failure!",
flowFile->getUUIDStr(), targetidtype);
session->transfer(flowFile, Failure);
return;
}
int32_t nsi;
try {
nsi = std::stoi(namespaceidx);
} catch (...) {
logger_->log_error("Flowfile %s has invalid namespace index (%s), routing to failure!",
flowFile->getUUIDStr(), namespaceidx);
session->transfer(flowFile, Failure);
return;
}
targetnode.namespaceIndex = nsi;
if (targetidtype == "Int") {
targetnode.identifierType = UA_NODEIDTYPE_NUMERIC;
try {
targetnode.identifier.numeric = std::stoi(targetid);
targetNodeValid = true;
} catch (...) {
logger_->log_error("Flowfile %s: target node ID is not a valid integer: %s. Routing to failure!",
flowFile->getUUIDStr(), targetid);
session->transfer(flowFile, Failure);
return;
}
} else if (targetidtype == "String") {
targetnode.identifierType = UA_NODEIDTYPE_STRING;
targetnode.identifier.string = UA_STRING_ALLOC(targetid.c_str());
targetNodeValid = true;
} else {
logger_->log_error("Flowfile %s: target node ID type is invalid: %s. Routing to failure!",
flowFile->getUUIDStr(), targetidtype);
session->transfer(flowFile, Failure);
return;
}
targetNodeExists = connection_->exists(targetnode);
}
const auto contentstr = to_string(session->readBuffer(flowFile));
if (targetNodeExists) {
logger_->log_trace("Node exists, trying to update it");
try {
UA_StatusCode sc;
switch (nodeDataType_) {
case opc::OPCNodeDataType::Int64: {
int64_t value = std::stoll(contentstr);
sc = connection_->update_node(targetnode, value);
break;
}
case opc::OPCNodeDataType::UInt64: {
uint64_t value = std::stoull(contentstr);
sc = connection_->update_node(targetnode, value);
break;
}
case opc::OPCNodeDataType::Int32: {
int32_t value = std::stoi(contentstr);
sc = connection_->update_node(targetnode, value);
break;
}
case opc::OPCNodeDataType::UInt32: {
uint32_t value = std::stoul(contentstr);
sc = connection_->update_node(targetnode, value);
break;
}
case opc::OPCNodeDataType::Boolean: {
const auto contentstr_parsed = utils::StringUtils::toBool(contentstr);
if (contentstr_parsed) {
sc = connection_->update_node(targetnode, contentstr_parsed.value());
} else {
throw opc::OPCException(GENERAL_EXCEPTION, "Content cannot be converted to bool");
}
break;
}
case opc::OPCNodeDataType::Float: {
float value = std::stof(contentstr);
sc = connection_->update_node(targetnode, value);
break;
}
case opc::OPCNodeDataType::Double: {
double value = std::stod(contentstr);
sc = connection_->update_node(targetnode, value);
break;
}
case opc::OPCNodeDataType::String: {
sc = connection_->update_node(targetnode, contentstr);
break;
}
default:
throw opc::OPCException(GENERAL_EXCEPTION, "This should never happen!");
}
if (sc != UA_STATUSCODE_GOOD) {
logger_->log_error("Failed to update node: %s", UA_StatusCode_name(sc));
session->transfer(flowFile, Failure);
return;
}
} catch (...) {
std::string typestr;
context->getProperty(ValueType, typestr);
logger_->log_error("Failed to convert %s to data type %s", contentstr, typestr);
session->transfer(flowFile, Failure);
return;
}
logger_->log_trace("Node successfully updated!");
session->transfer(flowFile, Success);
return;
} else {
logger_->log_trace("Node doesn't exist, trying to create new node");
std::string browsename;
if (!context->getProperty(TargetNodeBrowseName, browsename, flowFile)) {
logger_->log_error("Target node browse name is required for flowfile (%s) as new node is to be created",
flowFile->getUUIDStr());
session->transfer(flowFile, Failure);
return;
}
if (!targetNodeValid) {
targetnode = UA_NODEID_NUMERIC(1, 0);
}
try {
UA_StatusCode sc;
UA_NodeId resultnode;
switch (nodeDataType_) {
case opc::OPCNodeDataType::Int64: {
int64_t value = std::stoll(contentstr);
sc = connection_->add_node(parentNodeID_, targetnode, browsename, value, &resultnode);
break;
}
case opc::OPCNodeDataType::UInt64: {
uint64_t value = std::stoull(contentstr);
sc = connection_->add_node(parentNodeID_, targetnode, browsename, value, &resultnode);
break;
}
case opc::OPCNodeDataType::Int32: {
int32_t value = std::stoi(contentstr);
sc = connection_->add_node(parentNodeID_, targetnode, browsename, value, &resultnode);
break;
}
case opc::OPCNodeDataType::UInt32: {
uint32_t value = std::stoul(contentstr);
sc = connection_->add_node(parentNodeID_, targetnode, browsename, value, &resultnode);
break;
}
case opc::OPCNodeDataType::Boolean: {
const auto contentstr_parsed = utils::StringUtils::toBool(contentstr);
if (contentstr_parsed) {
sc = connection_->add_node(parentNodeID_, targetnode, browsename, contentstr_parsed.value(), &resultnode);
} else {
throw opc::OPCException(GENERAL_EXCEPTION, "Content cannot be converted to bool");
}
break;
}
case opc::OPCNodeDataType::Float: {
float value = std::stof(contentstr);
sc = connection_->add_node(parentNodeID_, targetnode, browsename, value, &resultnode);
break;
}
case opc::OPCNodeDataType::Double: {
double value = std::stod(contentstr);
sc = connection_->add_node(parentNodeID_, targetnode, browsename, value, &resultnode);
break;
}
case opc::OPCNodeDataType::String: {
sc = connection_->add_node(parentNodeID_, targetnode, browsename, contentstr, &resultnode);
break;
}
default:
throw opc::OPCException(GENERAL_EXCEPTION, "This should never happen!");
}
if (sc != UA_STATUSCODE_GOOD) {
logger_->log_error("Failed to create node: %s", UA_StatusCode_name(sc));
session->transfer(flowFile, Failure);
return;
}
} catch (...) {
std::string typestr;
context->getProperty(ValueType, typestr);
logger_->log_error("Failed to convert %s to data type %s", contentstr, typestr);
session->transfer(flowFile, Failure);
return;
}
logger_->log_trace("Node successfully created!");
session->transfer(flowFile, Success);
return;
}
}