in libminifi/src/core/flow/StructuredConfiguration.cpp [441:552]
void StructuredConfiguration::parseRemoteProcessGroup(const Node& rpg_node_seq, core::ProcessGroup* parentGroup) {
utils::Identifier uuid;
std::string id;
if (!parentGroup) {
logger_->log_error("parseRemoteProcessGroup: no parent group exists");
return;
}
if (!rpg_node_seq || !rpg_node_seq.isSequence()) {
return;
}
for (const auto& currRpgNode : rpg_node_seq) {
checkRequiredField(currRpgNode, schema_.name);
auto name = currRpgNode[schema_.name].getString().value();
id = getOrGenerateId(currRpgNode);
logger_->log_debug("parseRemoteProcessGroup: name => [{}], id => [{}]", name, id);
auto url = getOptionalField(currRpgNode, schema_.rpg_url, "");
logger_->log_debug("parseRemoteProcessGroup: url => [{}]", url);
uuid = id;
auto group = createRemoteProcessGroup(name, uuid);
group->setParent(parentGroup);
if (currRpgNode[schema_.rpg_yield_period]) {
auto yieldPeriod = currRpgNode[schema_.rpg_yield_period].getString().value();
logger_->log_debug("parseRemoteProcessGroup: yield period => [{}]", yieldPeriod);
auto yield_period_value = utils::timeutils::StringToDuration<std::chrono::milliseconds>(yieldPeriod);
if (yield_period_value.has_value() && group) {
logger_->log_debug("parseRemoteProcessGroup: yieldPeriod => [{}]", yield_period_value);
group->setYieldPeriodMsec(*yield_period_value);
}
}
if (currRpgNode[schema_.rpg_timeout]) {
auto timeout = currRpgNode[schema_.rpg_timeout].getString().value();
logger_->log_debug("parseRemoteProcessGroup: timeout => [{}]", timeout);
auto timeout_value = utils::timeutils::StringToDuration<std::chrono::milliseconds>(timeout);
if (timeout_value.has_value() && group) {
logger_->log_debug("parseRemoteProcessGroup: timeoutValue => [{}]", timeout_value);
group->setTimeout(timeout_value->count());
}
}
if (currRpgNode[schema_.rpg_local_network_interface]) {
auto interface = currRpgNode[schema_.rpg_local_network_interface].getString().value();
logger_->log_debug("parseRemoteProcessGroup: local network interface => [{}]", interface);
group->setInterface(interface);
}
if (currRpgNode[schema_.rpg_transport_protocol]) {
auto transport_protocol = currRpgNode[schema_.rpg_transport_protocol].getString().value();
logger_->log_debug("parseRemoteProcessGroup: transport protocol => [{}]", transport_protocol);
if (transport_protocol == "HTTP") {
group->setTransportProtocol(transport_protocol);
if (currRpgNode[schema_.rpg_proxy_host]) {
auto http_proxy_host = currRpgNode[schema_.rpg_proxy_host].getString().value();
logger_->log_debug("parseRemoteProcessGroup: proxy host => [{}]", http_proxy_host);
group->setHttpProxyHost(http_proxy_host);
if (currRpgNode[schema_.rpg_proxy_user]) {
auto http_proxy_username = currRpgNode[schema_.rpg_proxy_user].getString().value();
logger_->log_debug("parseRemoteProcessGroup: proxy user => [{}]", http_proxy_username);
group->setHttpProxyUserName(http_proxy_username);
}
if (currRpgNode[schema_.rpg_proxy_password]) {
auto http_proxy_password = currRpgNode[schema_.rpg_proxy_password].getString().value();
logger_->log_debug("parseRemoteProcessGroup: proxy password => [{}]", http_proxy_password);
group->setHttpProxyPassWord(http_proxy_password);
}
if (currRpgNode[schema_.rpg_proxy_port]) {
auto http_proxy_port = currRpgNode[schema_.rpg_proxy_port].getIntegerAsString().value();
if (auto port = parsing::parseIntegral<int>(http_proxy_port)) {
logger_->log_debug("parseRemoteProcessGroup: proxy port => [{}]", *port);
group->setHttpProxyPort(*port);
}
}
}
} else if (transport_protocol == "RAW") {
group->setTransportProtocol(transport_protocol);
} else {
std::stringstream stream;
stream << "Invalid transport protocol " << transport_protocol;
throw minifi::Exception(ExceptionType::SITE2SITE_EXCEPTION, stream.str().c_str());
}
}
group->setTransmitting(true);
group->setURL(url);
checkRequiredField(currRpgNode, schema_.rpg_input_ports);
auto inputPorts = currRpgNode[schema_.rpg_input_ports];
if (inputPorts && inputPorts.isSequence()) {
for (const auto& currPort : inputPorts) {
parseRPGPort(currPort, group.get(), sitetosite::SEND);
} // for node
}
auto outputPorts = currRpgNode[schema_.rpg_output_ports];
if (outputPorts && outputPorts.isSequence()) {
for (const auto& currPort : outputPorts) {
logger_->log_debug("Got a current port, iterating...");
parseRPGPort(currPort, group.get(), sitetosite::RECEIVE);
} // for node
}
parentGroup->addProcessGroup(std::move(group));
}
}