in libminifi/src/core/flow/StructuredConfiguration.cpp [304:416]
void StructuredConfiguration::parseRemoteProcessGroup(const Node& rpg_node_seq, core::ProcessGroup* parentGroup) {
utils::Identifier uuid;
std::string id;
if (!parentGroup) {
logger_->log_error("parseRemoteProcessGroup: no parent group exists");
return;
}
if (!rpg_node_seq || !rpg_node_seq.isSequence()) {
return;
}
for (const auto& currRpgNode : rpg_node_seq) {
checkRequiredField(currRpgNode, schema_.name);
auto name = currRpgNode[schema_.name].getString().value();
id = getOrGenerateId(currRpgNode);
logger_->log_debug("parseRemoteProcessGroup: name => [%s], id => [%s]", name, id);
auto url = getOptionalField(currRpgNode, schema_.rpg_url, "");
logger_->log_debug("parseRemoteProcessGroup: url => [%s]", url);
uuid = id;
auto group = createRemoteProcessGroup(name, uuid);
group->setParent(parentGroup);
if (currRpgNode[schema_.rpg_yield_period]) {
auto yieldPeriod = currRpgNode[schema_.rpg_yield_period].getString().value();
logger_->log_debug("parseRemoteProcessGroup: yield period => [%s]", yieldPeriod);
auto yield_period_value = utils::timeutils::StringToDuration<std::chrono::milliseconds>(yieldPeriod);
if (yield_period_value.has_value() && group) {
logger_->log_debug("parseRemoteProcessGroup: yieldPeriod => [%" PRId64 "] ms", yield_period_value->count());
group->setYieldPeriodMsec(*yield_period_value);
}
}
if (currRpgNode[schema_.rpg_timeout]) {
auto timeout = currRpgNode[schema_.rpg_timeout].getString().value();
logger_->log_debug("parseRemoteProcessGroup: timeout => [%s]", timeout);
auto timeout_value = utils::timeutils::StringToDuration<std::chrono::milliseconds>(timeout);
if (timeout_value.has_value() && group) {
logger_->log_debug("parseRemoteProcessGroup: timeoutValue => [%" PRId64 "] ms", timeout_value->count());
group->setTimeout(timeout_value->count());
}
}
if (currRpgNode[schema_.rpg_local_network_interface]) {
auto interface = currRpgNode[schema_.rpg_local_network_interface].getString().value();
logger_->log_debug("parseRemoteProcessGroup: local network interface => [%s]", interface);
group->setInterface(interface);
}
if (currRpgNode[schema_.rpg_transport_protocol]) {
auto transport_protocol = currRpgNode[schema_.rpg_transport_protocol].getString().value();
logger_->log_debug("parseRemoteProcessGroup: transport protocol => [%s]", transport_protocol);
if (transport_protocol == "HTTP") {
group->setTransportProtocol(transport_protocol);
if (currRpgNode[schema_.rpg_proxy_host]) {
auto http_proxy_host = currRpgNode[schema_.rpg_proxy_host].getString().value();
logger_->log_debug("parseRemoteProcessGroup: proxy host => [%s]", http_proxy_host);
group->setHttpProxyHost(http_proxy_host);
if (currRpgNode[schema_.rpg_proxy_user]) {
auto http_proxy_username = currRpgNode[schema_.rpg_proxy_user].getString().value();
logger_->log_debug("parseRemoteProcessGroup: proxy user => [%s]", http_proxy_username);
group->setHttpProxyUserName(http_proxy_username);
}
if (currRpgNode[schema_.rpg_proxy_password]) {
auto http_proxy_password = currRpgNode[schema_.rpg_proxy_password].getString().value();
logger_->log_debug("parseRemoteProcessGroup: proxy password => [%s]", http_proxy_password);
group->setHttpProxyPassWord(http_proxy_password);
}
if (currRpgNode[schema_.rpg_proxy_port]) {
auto http_proxy_port = currRpgNode[schema_.rpg_proxy_port].getIntegerAsString().value();
int32_t port;
if (core::Property::StringToInt(http_proxy_port, port)) {
logger_->log_debug("parseRemoteProcessGroup: proxy port => [%d]", port);
group->setHttpProxyPort(port);
}
}
}
} else if (transport_protocol == "RAW") {
group->setTransportProtocol(transport_protocol);
} else {
std::stringstream stream;
stream << "Invalid transport protocol " << transport_protocol;
throw minifi::Exception(ExceptionType::SITE2SITE_EXCEPTION, stream.str().c_str());
}
}
group->setTransmitting(true);
group->setURL(url);
checkRequiredField(currRpgNode, schema_.rpg_input_ports);
auto inputPorts = currRpgNode[schema_.rpg_input_ports];
if (inputPorts && inputPorts.isSequence()) {
for (const auto& currPort : inputPorts) {
parseRPGPort(currPort, group.get(), sitetosite::SEND);
} // for node
}
auto outputPorts = currRpgNode[schema_.rpg_output_ports];
if (outputPorts && outputPorts.isSequence()) {
for (const auto& currPort : outputPorts) {
logger_->log_debug("Got a current port, iterating...");
parseRPGPort(currPort, group.get(), sitetosite::RECEIVE);
} // for node
}
parentGroup->addProcessGroup(std::move(group));
}
}