in libminifi/src/RemoteProcessorGroupPort.cpp [250:358]
std::pair<std::string, int> RemoteProcessorGroupPort::refreshRemoteSite2SiteInfo() {
if (nifi_instances_.empty())
return std::make_pair("", -1);
for (const auto& nifi : nifi_instances_) {
std::string host = nifi.host_;
#ifdef WIN32
if ("localhost" == host) {
host = org::apache::nifi::minifi::io::Socket::getMyHostName();
}
#endif
std::string protocol = nifi.protocol_;
int nifi_port = nifi.port_;
std::stringstream fullUrl;
fullUrl << protocol << host;
// don't append port if it is 0 ( undefined )
if (nifi_port > 0) {
fullUrl << ":" << std::to_string(nifi_port);
}
fullUrl << "/nifi-api/site-to-site";
configure_->get(Configure::nifi_rest_api_user_name, this->rest_user_name_);
configure_->get(Configure::nifi_rest_api_password, this->rest_password_);
std::string token;
std::unique_ptr<utils::BaseHTTPClient> client;
if (!rest_user_name_.empty()) {
std::stringstream loginUrl;
loginUrl << protocol << host;
// don't append port if it is 0 ( undefined )
if (nifi_port > 0) {
loginUrl << ":" << std::to_string(nifi_port);
}
loginUrl << "/nifi-api/access/token";
auto client_ptr = core::ClassLoader::getDefaultClassLoader().instantiateRaw("HTTPClient", "HTTPClient");
if (nullptr == client_ptr) {
logger_->log_error("Could not locate HTTPClient. You do not have cURL support!");
return std::make_pair("", -1);
}
client = std::unique_ptr<utils::BaseHTTPClient>(dynamic_cast<utils::BaseHTTPClient*>(client_ptr));
client->initialize("GET", loginUrl.str(), ssl_service);
// use a connection timeout. if this times out we will simply attempt re-connection
// so no need for configuration parameter that isn't already defined in Processor
client->setConnectionTimeout(10s);
client->setReadTimeout(idle_timeout_);
token = utils::get_token(client.get(), this->rest_user_name_, this->rest_password_);
logger_->log_debug("Token from NiFi REST Api endpoint %s, %s", loginUrl.str(), token);
if (token.empty())
return std::make_pair("", -1);
}
auto client_ptr = core::ClassLoader::getDefaultClassLoader().instantiateRaw("HTTPClient", "HTTPClient");
if (nullptr == client_ptr) {
logger_->log_error("Could not locate HTTPClient. You do not have cURL support, defaulting to base configuration!");
return std::make_pair("", -1);
}
int siteTosite_port_ = -1;
client = std::unique_ptr<utils::BaseHTTPClient>(dynamic_cast<utils::BaseHTTPClient*>(client_ptr));
client->initialize("GET", fullUrl.str(), ssl_service);
// use a connection timeout. if this times out we will simply attempt re-connection
// so no need for configuration parameter that isn't already defined in Processor
client->setConnectionTimeout(10s);
client->setReadTimeout(idle_timeout_);
if (!proxy_.host.empty()) {
client->setHTTPProxy(proxy_);
}
if (!token.empty())
client->setRequestHeader("Authorization", token);
if (client->submit() && client->getResponseCode() == 200) {
const std::vector<char> &response_body = client->getResponseBody();
if (!response_body.empty()) {
std::string controller = std::string(response_body.begin(), response_body.end());
logger_->log_trace("controller config %s", controller);
rapidjson::Document doc;
rapidjson::ParseResult ok = doc.Parse(controller.c_str());
if (ok && doc.IsObject() && !doc.ObjectEmpty()) {
rapidjson::Value::MemberIterator itr = doc.FindMember("controller");
if (itr != doc.MemberEnd() && itr->value.IsObject()) {
rapidjson::Value controllerValue = itr->value.GetObject();
rapidjson::Value::ConstMemberIterator end_itr = controllerValue.MemberEnd();
rapidjson::Value::ConstMemberIterator port_itr = controllerValue.FindMember("remoteSiteListeningPort");
rapidjson::Value::ConstMemberIterator secure_itr = controllerValue.FindMember("siteToSiteSecure");
if (client_type_ == sitetosite::CLIENT_TYPE::RAW && port_itr != end_itr && port_itr->value.IsNumber())
siteTosite_port_ = port_itr->value.GetInt();
else
siteTosite_port_ = nifi_port;
if (secure_itr != end_itr && secure_itr->value.IsBool())
this->site2site_secure_ = secure_itr->value.GetBool();
}
logger_->log_debug("process group remote site2site port %d, is secure %d", siteTosite_port_, site2site_secure_);
return std::make_pair(host, siteTosite_port_);
}
} else {
logger_->log_error("Cannot output body to content for ProcessGroup::refreshRemoteSite2SiteInfo: received HTTP code %" PRId64 " from %s", client->getResponseCode(), fullUrl.str());
}
} else {
logger_->log_error("ProcessGroup::refreshRemoteSite2SiteInfo -- curl_easy_perform() failed , response code %d\n", client->getResponseCode());
}
}
return std::make_pair("", -1);
}