void StructuredConfiguration::parseRemoteProcessGroup()

in libminifi/src/core/flow/StructuredConfiguration.cpp [441:552]


void StructuredConfiguration::parseRemoteProcessGroup(const Node& rpg_node_seq, core::ProcessGroup* parentGroup) {
  utils::Identifier uuid;
  std::string id;

  if (!parentGroup) {
    logger_->log_error("parseRemoteProcessGroup: no parent group exists");
    return;
  }

  if (!rpg_node_seq || !rpg_node_seq.isSequence()) {
    return;
  }
  for (const auto& currRpgNode : rpg_node_seq) {
    checkRequiredField(currRpgNode, schema_.name);
    auto name = currRpgNode[schema_.name].getString().value();
    id = getOrGenerateId(currRpgNode);

    logger_->log_debug("parseRemoteProcessGroup: name => [{}], id => [{}]", name, id);

    auto url = getOptionalField(currRpgNode, schema_.rpg_url, "");

    logger_->log_debug("parseRemoteProcessGroup: url => [{}]", url);

    uuid = id;
    auto group = createRemoteProcessGroup(name, uuid);
    group->setParent(parentGroup);

    if (currRpgNode[schema_.rpg_yield_period]) {
      auto yieldPeriod = currRpgNode[schema_.rpg_yield_period].getString().value();
      logger_->log_debug("parseRemoteProcessGroup: yield period => [{}]", yieldPeriod);

      auto yield_period_value = utils::timeutils::StringToDuration<std::chrono::milliseconds>(yieldPeriod);
      if (yield_period_value.has_value() && group) {
        logger_->log_debug("parseRemoteProcessGroup: yieldPeriod => [{}]", yield_period_value);
        group->setYieldPeriodMsec(*yield_period_value);
      }
    }

    if (currRpgNode[schema_.rpg_timeout]) {
      auto timeout = currRpgNode[schema_.rpg_timeout].getString().value();
      logger_->log_debug("parseRemoteProcessGroup: timeout => [{}]", timeout);

      auto timeout_value = utils::timeutils::StringToDuration<std::chrono::milliseconds>(timeout);
      if (timeout_value.has_value() && group) {
        logger_->log_debug("parseRemoteProcessGroup: timeoutValue => [{}]", timeout_value);
        group->setTimeout(timeout_value->count());
      }
    }

    if (currRpgNode[schema_.rpg_local_network_interface]) {
      auto interface = currRpgNode[schema_.rpg_local_network_interface].getString().value();
      logger_->log_debug("parseRemoteProcessGroup: local network interface => [{}]", interface);
      group->setInterface(interface);
    }

    if (currRpgNode[schema_.rpg_transport_protocol]) {
      auto transport_protocol = currRpgNode[schema_.rpg_transport_protocol].getString().value();
      logger_->log_debug("parseRemoteProcessGroup: transport protocol => [{}]", transport_protocol);
      if (transport_protocol == "HTTP") {
        group->setTransportProtocol(transport_protocol);
        if (currRpgNode[schema_.rpg_proxy_host]) {
          auto http_proxy_host = currRpgNode[schema_.rpg_proxy_host].getString().value();
          logger_->log_debug("parseRemoteProcessGroup: proxy host => [{}]", http_proxy_host);
          group->setHttpProxyHost(http_proxy_host);
          if (currRpgNode[schema_.rpg_proxy_user]) {
            auto http_proxy_username = currRpgNode[schema_.rpg_proxy_user].getString().value();
            logger_->log_debug("parseRemoteProcessGroup: proxy user => [{}]", http_proxy_username);
            group->setHttpProxyUserName(http_proxy_username);
          }
          if (currRpgNode[schema_.rpg_proxy_password]) {
            auto http_proxy_password = currRpgNode[schema_.rpg_proxy_password].getString().value();
            logger_->log_debug("parseRemoteProcessGroup: proxy password => [{}]", http_proxy_password);
            group->setHttpProxyPassWord(http_proxy_password);
          }
          if (currRpgNode[schema_.rpg_proxy_port]) {
            auto http_proxy_port = currRpgNode[schema_.rpg_proxy_port].getIntegerAsString().value();
            if (auto port = parsing::parseIntegral<int>(http_proxy_port)) {
              logger_->log_debug("parseRemoteProcessGroup: proxy port => [{}]", *port);
              group->setHttpProxyPort(*port);
            }
          }
        }
      } else if (transport_protocol == "RAW") {
        group->setTransportProtocol(transport_protocol);
      } else {
        std::stringstream stream;
        stream << "Invalid transport protocol " << transport_protocol;
        throw minifi::Exception(ExceptionType::SITE2SITE_EXCEPTION, stream.str().c_str());
      }
    }

    group->setTransmitting(true);
    group->setURL(url);

    checkRequiredField(currRpgNode, schema_.rpg_input_ports);
    auto inputPorts = currRpgNode[schema_.rpg_input_ports];
    if (inputPorts && inputPorts.isSequence()) {
      for (const auto& currPort : inputPorts) {
        parseRPGPort(currPort, group.get(), sitetosite::SEND);
      }  // for node
    }
    auto outputPorts = currRpgNode[schema_.rpg_output_ports];
    if (outputPorts && outputPorts.isSequence()) {
      for (const auto& currPort : outputPorts) {
        logger_->log_debug("Got a current port, iterating...");

        parseRPGPort(currPort, group.get(), sitetosite::RECEIVE);
      }  // for node
    }
    parentGroup->addProcessGroup(std::move(group));
  }
}