std::pair RemoteProcessorGroupPort::refreshRemoteSite2SiteInfo()

in libminifi/src/RemoteProcessorGroupPort.cpp [219:328]


std::pair<std::string, int> RemoteProcessorGroupPort::refreshRemoteSite2SiteInfo() {
  if (nifi_instances_.empty())
    return std::make_pair("", -1);

  for (const auto& nifi : nifi_instances_) {
    std::string host = nifi.host_;
#ifdef WIN32
    if ("localhost" == host) {
      host = org::apache::nifi::minifi::utils::net::getMyHostName();
    }
#endif
    std::string protocol = nifi.protocol_;
    int nifi_port = nifi.port_;
    std::stringstream fullUrl;
    fullUrl << protocol << host;
    // don't append port if it is 0 ( undefined )
    if (nifi_port > 0) {
      fullUrl << ":" << std::to_string(nifi_port);
    }
    fullUrl << "/nifi-api/site-to-site";

    configure_->get(Configure::nifi_rest_api_user_name, this->rest_user_name_);
    configure_->get(Configure::nifi_rest_api_password, this->rest_password_);

    std::string token;
    std::unique_ptr<http::BaseHTTPClient> client;
    if (!rest_user_name_.empty()) {
      std::stringstream loginUrl;
      loginUrl << protocol << host;
      // don't append port if it is 0 ( undefined )
      if (nifi_port > 0) {
        loginUrl << ":" << std::to_string(nifi_port);
      }
      loginUrl << "/nifi-api/access/token";

      auto client_ptr = core::ClassLoader::getDefaultClassLoader().instantiateRaw("HTTPClient", "HTTPClient");
      if (nullptr == client_ptr) {
        logger_->log_error("Could not locate HTTPClient. You do not have cURL support!");
        return std::make_pair("", -1);
      }
      client = std::unique_ptr<http::BaseHTTPClient>(dynamic_cast<http::BaseHTTPClient*>(client_ptr));
      client->initialize(http::HttpRequestMethod::GET, loginUrl.str(), ssl_service);
      // use a connection timeout. if this times out we will simply attempt re-connection
      // so no need for configuration parameter that isn't already defined in Processor
      client->setConnectionTimeout(10s);
      client->setReadTimeout(idle_timeout_);

      token = http::get_token(client.get(), this->rest_user_name_, this->rest_password_);
      logger_->log_debug("Token from NiFi REST Api endpoint {},  {}", loginUrl.str(), token);
      if (token.empty())
        return std::make_pair("", -1);
    }

    auto client_ptr = core::ClassLoader::getDefaultClassLoader().instantiateRaw("HTTPClient", "HTTPClient");
    if (nullptr == client_ptr) {
      logger_->log_error("Could not locate HTTPClient. You do not have cURL support, defaulting to base configuration!");
      return std::make_pair("", -1);
    }
    int siteTosite_port_ = -1;
    client = std::unique_ptr<http::BaseHTTPClient>(dynamic_cast<http::BaseHTTPClient*>(client_ptr));
    client->initialize(http::HttpRequestMethod::GET, fullUrl.str(), ssl_service);
    // use a connection timeout. if this times out we will simply attempt re-connection
    // so no need for configuration parameter that isn't already defined in Processor
    client->setConnectionTimeout(10s);
    client->setReadTimeout(idle_timeout_);
    if (!proxy_.host.empty()) {
      client->setHTTPProxy(proxy_);
    }
    if (!token.empty())
      client->setRequestHeader("Authorization", token);

    client->setVerbose(false);

    if (client->submit() && client->getResponseCode() == 200) {
      const std::vector<char> &response_body = client->getResponseBody();
      if (!response_body.empty()) {
        std::string controller = std::string(response_body.begin(), response_body.end());
        logger_->log_trace("controller config {}", controller);
        rapidjson::Document doc;
        rapidjson::ParseResult ok = doc.Parse(controller.c_str());

        if (ok && doc.IsObject() && !doc.ObjectEmpty()) {
          rapidjson::Value::MemberIterator itr = doc.FindMember("controller");

          if (itr != doc.MemberEnd() && itr->value.IsObject()) {
            rapidjson::Value controllerValue = itr->value.GetObject();
            rapidjson::Value::ConstMemberIterator end_itr = controllerValue.MemberEnd();
            rapidjson::Value::ConstMemberIterator port_itr = controllerValue.FindMember("remoteSiteListeningPort");
            rapidjson::Value::ConstMemberIterator secure_itr = controllerValue.FindMember("siteToSiteSecure");

            if (client_type_ == sitetosite::CLIENT_TYPE::RAW && port_itr != end_itr && port_itr->value.IsNumber())
              siteTosite_port_ = port_itr->value.GetInt();
            else
              siteTosite_port_ = nifi_port;

            if (secure_itr != end_itr && secure_itr->value.IsBool())
              this->site2site_secure_ = secure_itr->value.GetBool();
          }
          logger_->log_debug("process group remote site2site port {}, is secure {}", siteTosite_port_, site2site_secure_);
          return std::make_pair(host, siteTosite_port_);
        }
      } else {
        logger_->log_error("Cannot output body to content for ProcessGroup::refreshRemoteSite2SiteInfo: received HTTP code {} from {}", client->getResponseCode(), fullUrl.str());
      }
    } else {
      logger_->log_error("ProcessGroup::refreshRemoteSite2SiteInfo -- curl_easy_perform() failed , response code {}\n", client->getResponseCode());
    }
  }
  return std::make_pair("", -1);
}