in nodemanager/core/RemoteCommunicator.cpp [102:244]
void RemoteCommunicator::HandlePost(http_request request)
{
auto uri = request.relative_uri().to_string();
Logger::Info("Request: Uri {0}", uri);
std::vector<std::string> tokens = String::Split(uri, '/');
if (tokens.size() < 4)
{
Logger::Warn("Not supported uri {0}", uri);
return;
}
// skip the first '/'.
int p = 1;
auto apiSpace = tokens[p++];
auto nodeName = tokens[p++];
auto methodName = tokens[p++];
Logger::Debug("Request: Uri {0}, Node {1}, Method {2}", uri, nodeName, methodName);
if (apiSpace != ApiSpace)
{
Logger::Error("Not allowed ApiSpace {0}", apiSpace);
request.reply(status_codes::NotFound, U("Not found"))
.then([this](auto t) { this->IsError(t); });
return;
}
std::string authenticationKey;
if (HttpHelper::FindHeader(request, HttpHelper::AuthenticationHeaderKey, authenticationKey))
{
Logger::Debug("AuthenticationKey found");
}
if (NodeManagerConfig::GetClusterAuthenticationKey() != authenticationKey)
{
Logger::Warn("Authentication key validation failed.");
request.reply(status_codes::Unauthorized, "").then([this](auto t) { this->IsError(t); });
return;
}
std::string callbackUri;
if (HttpHelper::FindCallbackUri(request, callbackUri))
{
Logger::Debug("CallbackUri found {0}", callbackUri.c_str());
}
std::transform(nodeName.begin(), nodeName.end(), nodeName.begin(), ::toupper);
if (nodeName != this->localNodeName)
{
// proxy to other node.
request.extract_json().then(
[callbackUri, nodeName, this, request, apiSpace, methodName] (pplx::task<json::value> t)
{
auto j = t.get();
auto req = HttpHelper::GetHttpRequest(methods::POST, j, callbackUri);
uri_builder uriBuilder(this->listeningUri);
if (nodeName == "LOCALHOST")
{
// only for test purpose, redirect the localhost to the local node name.
uriBuilder.set_path(String::Join("", "/", apiSpace, "/", this->localNodeName, "/", methodName));
}
else
{
uriBuilder.set_path(request.relative_uri().to_string());
}
uriBuilder.set_host(nodeName);
auto newUri = uriBuilder.to_string();
auto cli = HttpHelper::GetHttpClient(newUri);
Logger::Info("Proxy to {0}", newUri);
cli->request(*req).then([request, newUri](pplx::task<http_response> responseTask)
{
try
{
auto response = responseTask.get();
Logger::Info("Proxy result from {0} response code {1}", newUri, response.status_code());
request.reply(response).then([](auto t) { IsError(t); });
}
catch (const std::exception& ex)
{
Logger::Error("Error when get response {0}", ex.what());
request.reply(http::status_codes::InternalError, json::value(ex.what())).then([](auto t) { IsError(t); });
}
});
});
return;
}
auto processor = this->processors.find(methodName);
if (processor != this->processors.end())
{
request.extract_json().then(
[processor, callback = std::move(callbackUri)] (pplx::task<json::value> t)
{
auto j = t.get();
// Logger::Debug("Json: {0}", j.serialize());
auto uri = callback;
return processor->second(std::move(j), std::move(uri));
})
.then([request] (pplx::task<json::value> t)
{
try
{
Logger::Info("Replied with content {0}", t.get());
request.reply(status_codes::OK, t.get()).then([](auto t) { IsError(t); });
}
catch (const web::http::http_exception& httpEx)
{
const std::string errorMessage = httpEx.what();
Logger::Error("Http exception occurred: {0}", errorMessage);
request.reply(status_codes::InternalError, errorMessage).then([](auto t) { IsError(t); });
}
catch (const FilterException& filterEx)
{
const std::string errorMessage = filterEx.what();
Logger::Error("Filter exception occurred: {0}", errorMessage);
request.reply(status_codes::InternalError + 50, errorMessage).then([](auto t) { IsError(t); });
}
catch (const std::exception& ex)
{
const std::string errorMessage = ex.what();
Logger::Error("Exception occurred: {0}", errorMessage);
request.reply(status_codes::InternalError, errorMessage).then([](auto t) { IsError(t); });
}
});
}
else
{
Logger::Warn("Unable to find the method {0}", methodName.c_str());
request.reply(status_codes::NotFound, "Cannot find the method").then([this](auto t) { this->IsError(t); });
}
}