nodemanager/core/NamingClient.cpp (96 lines of code) (raw):

#include "NamingClient.h" #include "NodeManagerConfig.h" #include "../utils/Logger.h" #include "../utils/ReaderLock.h" #include "HttpHelper.h" #include <stdlib.h> using namespace web::http; using namespace web::http::client; using namespace hpc::core; using namespace hpc::utils; void NamingClient::InvalidateCache() { auto instance = GetInstance(NodeManagerConfig::GetNamingServiceUri()); if (instance) { WriterLock writerLock(&instance->lock); Logger::Debug("ResolveServiceLocation> Cleared."); instance->serviceLocations.clear(); } } std::string NamingClient::GetServiceLocation(const std::string& serviceName, pplx::cancellation_token token) { std::map<std::string, std::string>::iterator location; { ReaderLock readerLock(&this->lock); location = this->serviceLocations.find(serviceName); } if (location == this->serviceLocations.end()) { { ReaderLock readerLock(&this->lock); Logger::Debug("ResolveServiceLocation> there is no entry for {0}", serviceName); for (auto kvp : this->serviceLocations) { Logger::Debug("ResolveServiceLocation> entry {0} = {1}", kvp.first, kvp.second); } location = this->serviceLocations.find(serviceName); } if (location == this->serviceLocations.end()) { std::string temp; this->RequestForServiceLocation(serviceName, temp, token); { WriterLock writerLock(&this->lock); this->serviceLocations[serviceName] = temp; location = this->serviceLocations.find(serviceName); } } } Logger::Debug("ResolveServiceLocation> Resolved serviceLocation {0} for {1}", location->second, serviceName); return location->second; } void NamingClient::RequestForServiceLocation(const std::string& serviceName, std::string& serviceLocation, pplx::cancellation_token token) { int selected = rand() % this->namingServicesUri.size(); std::string uri; int interval = this->intervalSeconds; while (!token.is_canceled()) { try { selected %= this->namingServicesUri.size(); uri = this->namingServicesUri[selected++] + serviceName; Logger::Info("ResolveServiceLocation> Fetching from {0}", uri); auto client = HttpHelper::GetHttpClient(uri); auto request = HttpHelper::GetHttpRequest(methods::GET); http_response response = client->request(*request, token).get(); if (response.status_code() == http::status_codes::OK) { serviceLocation = JsonHelper<std::string>::FromJson(response.extract_json().get()); Logger::Debug("ResolveServiceLocation> Fetched from {0} response code {1}, location {2}", uri, response.status_code(), serviceLocation); return; } else { Logger::Debug("ResolveServiceLocation> Fetched from {0} response code {1}", uri, response.status_code()); } } catch (const http_exception& httpEx) { Logger::Warn("ResolveServiceLocation> HttpException occurred when fetching from {0}, ex {1}", uri, httpEx.what()); } catch (const std::exception& ex) { Logger::Error("ResolveServiceLocation> Exception occurred when fetching from {0}, ex {1}", uri, ex.what()); } catch (...) { Logger::Error("ResolveServiceLocation> Unknown error occurred when fetching from {0}", uri); } if (token.is_canceled()) break; sleep(interval); interval *= 2; if (interval > 60) interval = 60; } }