nodemanager/core/RemoteCommunicator.cpp (259 lines of code) (raw):

#include <sstream> #include <stdlib.h> #include "RemoteCommunicator.h" #include "../utils/String.h" #include "../utils/System.h" #include "../arguments/StartJobAndTaskArgs.h" #include "../common/ErrorCodes.h" #include "NodeManagerConfig.h" #include "HttpHelper.h" #include "../filters/FilterException.h" #include "../arguments/MetricCountersConfig.h" using namespace web::http; using namespace web; using namespace hpc::utils; using namespace hpc::arguments; using namespace hpc::core; using namespace hpc::common; using namespace web::http::experimental::listener; using namespace hpc::filters; RemoteCommunicator::RemoteCommunicator(IRemoteExecutor& exec, const http_listener_config& config, const std::string& uri) : listeningUri(uri), isListening(false), localNodeName(System::GetNodeName()), executor(exec), listener(listeningUri, config) { this->listener.support( methods::POST, [this](auto request) { this->HandlePost(request); }); if (NodeManagerConfig::GetDebug()) { this->listener.support( methods::GET, [this](auto request) { this->HandleGet(request); }); } this->processors["startjobandtask"] = [this] (auto&& j, auto&& c) mutable -> pplx::task<json::value> { return this->StartJobAndTask(std::move(j), std::move(c)); }; this->processors["starttask"] = [this] (auto&& j, auto&& c) mutable -> pplx::task<json::value> { return this->StartTask(std::move(j), std::move(c)); }; this->processors["endjob"] = [this] (auto&& j, auto&& c) mutable -> pplx::task<json::value> { return this->EndJob(std::move(j), std::move(c)); }; this->processors["endtask"] = [this] (auto&& j, auto&& c) { return this->EndTask(std::move(j), std::move(c)); }; this->processors["ping"] = [this] (auto&& j, auto&& c) { return this->Ping(std::move(j), std::move(c)); }; this->processors["metric"] = [this] (auto&& j, auto&& c) { return this->Metric(std::move(j), std::move(c)); }; this->processors["metricconfig"] = [this] (auto&& j, auto&& c) { return this->MetricConfig(std::move(j), std::move(c)); }; this->processors["peektaskoutput"] = [this] (auto&& j, auto&& c) { return this->PeekTaskOutput(std::move(j), std::move(c)); }; } RemoteCommunicator::~RemoteCommunicator() { this->Close(); } void RemoteCommunicator::Open() { if (!this->isListening) { this->listener.open().then([this](auto t) { this->isListening = !IsError(t); Logger::Info( "Opening at {0}, result {1}", this->listeningUri, this->isListening ? "opened." : "failed."); if (!this->isListening) { exit((int)ErrorCodes::FailedToOpenPort); } }); } } void RemoteCommunicator::Close() { if (this->isListening) { try { Logger::Info("Closing the communicator {0}", this->listener.uri().to_string().c_str()); this->listener.close().wait(); Logger::Info("Closed the communicator {0}", this->listener.uri().to_string().c_str()); this->isListening = false; } catch (const std::exception& ex) { Logger::Error("Exception happened while close the listener {0}, {1}", this->listener.uri().to_string().c_str(), ex.what()); } } } void RemoteCommunicator::HandleGet(http_request request) { auto uri = request.relative_uri().to_string(); Logger::Info("Request (GET): Uri {0}", uri); json::value body; body["status"] = json::value::string("node manager working"); request.reply(status_codes::OK, body).then([this](auto t) { this->IsError(t); }); } void RemoteCommunicator::HandlePost(http_request request) { auto uri = request.relative_uri().to_string(); Logger::Info("Request: Uri {0}", uri); std::vector<std::string> tokens = String::Split(uri, '/'); if (tokens.size() < 4) { Logger::Warn("Not supported uri {0}", uri); return; } // skip the first '/'. int p = 1; auto apiSpace = tokens[p++]; auto nodeName = tokens[p++]; auto methodName = tokens[p++]; Logger::Debug("Request: Uri {0}, Node {1}, Method {2}", uri, nodeName, methodName); if (apiSpace != ApiSpace) { Logger::Error("Not allowed ApiSpace {0}", apiSpace); request.reply(status_codes::NotFound, U("Not found")) .then([this](auto t) { this->IsError(t); }); return; } std::string authenticationKey; if (HttpHelper::FindHeader(request, HttpHelper::AuthenticationHeaderKey, authenticationKey)) { Logger::Debug("AuthenticationKey found"); } if (NodeManagerConfig::GetClusterAuthenticationKey() != authenticationKey) { Logger::Warn("Authentication key validation failed."); request.reply(status_codes::Unauthorized, "").then([this](auto t) { this->IsError(t); }); return; } std::string callbackUri; if (HttpHelper::FindCallbackUri(request, callbackUri)) { Logger::Debug("CallbackUri found {0}", callbackUri.c_str()); } std::transform(nodeName.begin(), nodeName.end(), nodeName.begin(), ::toupper); if (nodeName != this->localNodeName) { // proxy to other node. request.extract_json().then( [callbackUri, nodeName, this, request, apiSpace, methodName] (pplx::task<json::value> t) { auto j = t.get(); auto req = HttpHelper::GetHttpRequest(methods::POST, j, callbackUri); uri_builder uriBuilder(this->listeningUri); if (nodeName == "LOCALHOST") { // only for test purpose, redirect the localhost to the local node name. uriBuilder.set_path(String::Join("", "/", apiSpace, "/", this->localNodeName, "/", methodName)); } else { uriBuilder.set_path(request.relative_uri().to_string()); } uriBuilder.set_host(nodeName); auto newUri = uriBuilder.to_string(); auto cli = HttpHelper::GetHttpClient(newUri); Logger::Info("Proxy to {0}", newUri); cli->request(*req).then([request, newUri](pplx::task<http_response> responseTask) { try { auto response = responseTask.get(); Logger::Info("Proxy result from {0} response code {1}", newUri, response.status_code()); request.reply(response).then([](auto t) { IsError(t); }); } catch (const std::exception& ex) { Logger::Error("Error when get response {0}", ex.what()); request.reply(http::status_codes::InternalError, json::value(ex.what())).then([](auto t) { IsError(t); }); } }); }); return; } auto processor = this->processors.find(methodName); if (processor != this->processors.end()) { request.extract_json().then( [processor, callback = std::move(callbackUri)] (pplx::task<json::value> t) { auto j = t.get(); // Logger::Debug("Json: {0}", j.serialize()); auto uri = callback; return processor->second(std::move(j), std::move(uri)); }) .then([request] (pplx::task<json::value> t) { try { Logger::Info("Replied with content {0}", t.get()); request.reply(status_codes::OK, t.get()).then([](auto t) { IsError(t); }); } catch (const web::http::http_exception& httpEx) { const std::string errorMessage = httpEx.what(); Logger::Error("Http exception occurred: {0}", errorMessage); request.reply(status_codes::InternalError, errorMessage).then([](auto t) { IsError(t); }); } catch (const FilterException& filterEx) { const std::string errorMessage = filterEx.what(); Logger::Error("Filter exception occurred: {0}", errorMessage); request.reply(status_codes::InternalError + 50, errorMessage).then([](auto t) { IsError(t); }); } catch (const std::exception& ex) { const std::string errorMessage = ex.what(); Logger::Error("Exception occurred: {0}", errorMessage); request.reply(status_codes::InternalError, errorMessage).then([](auto t) { IsError(t); }); } }); } else { Logger::Warn("Unable to find the method {0}", methodName.c_str()); request.reply(status_codes::NotFound, "Cannot find the method").then([this](auto t) { this->IsError(t); }); } } pplx::task<json::value> RemoteCommunicator::StartJobAndTask(json::value&& val, std::string&& callbackUri) { auto args = StartJobAndTaskArgs::FromJson(val); return this->filter.OnJobStart(args.JobId, args.TaskId, args.StartInfo.TaskRequeueCount, val).then( [this, callback = std::move(callbackUri)](pplx::task<json::value> t) { auto filteredJson = t.get(); auto uri = callback; return this->executor.StartJobAndTask(StartJobAndTaskArgs::FromJson(filteredJson), std::move(uri)); }); } pplx::task<json::value> RemoteCommunicator::StartTask(json::value&& val, std::string&& callbackUri) { auto args = StartTaskArgs::FromJson(val); return this->filter.OnTaskStart(args.JobId, args.TaskId, args.StartInfo.TaskRequeueCount, val).then( [this, callback = std::move(callbackUri)](pplx::task<json::value> t) { auto filteredJson = t.get(); auto uri = callback; return this->executor.StartTask(StartTaskArgs::FromJson(filteredJson), std::move(uri)); }); } pplx::task<json::value> RemoteCommunicator::EndJob(json::value&& val, std::string&& callbackUri) { auto args = EndJobArgs::FromJson(val); this->filter.OnJobEnd(args.JobId, val).then([this](pplx::task<json::value> t) { this->IsError(t); }); return this->executor.EndJob(std::move(args)); } pplx::task<json::value> RemoteCommunicator::EndTask(json::value&& val, std::string&& callbackUri) { auto args = EndTaskArgs::FromJson(val); return this->executor.EndTask(std::move(args), std::move(callbackUri)); } pplx::task<json::value> RemoteCommunicator::Ping(json::value&& val, std::string&& callbackUri) { return this->executor.Ping(std::move(callbackUri)); } pplx::task<json::value> RemoteCommunicator::Metric(json::value&& val, std::string&& callbackUri) { return this->executor.Metric(std::move(callbackUri)); } pplx::task<json::value> RemoteCommunicator::MetricConfig(json::value&& val, std::string&& callbackUri) { auto args = MetricCountersConfig::FromJson(val); return this->executor.MetricConfig(std::move(args), std::move(callbackUri)); } pplx::task<json::value> RemoteCommunicator::PeekTaskOutput(json::value&& val, std::string&& callbackUri) { auto args = PeekTaskOutputArgs::FromJson(val); return this->executor.PeekTaskOutput(std::move(args)); } const std::string RemoteCommunicator::ApiSpace = "api";