nodemanager/core/UdpReporter.cpp (128 lines of code) (raw):
#include <sys/types.h>
#include <sys/socket.h>
#include <netdb.h>
#include <unistd.h>
#include <cpprest/http_client.h>
#include "UdpReporter.h"
#include "NodeManagerConfig.h"
using namespace hpc::core;
using namespace hpc::utils;
UdpReporter::UdpReporter(
const std::string& name,
std::function<std::string(pplx::cancellation_token)> getReportUri,
int hold,
int interval,
std::function<std::vector<std::vector<unsigned char>>()> fetcher,
std::function<void(int)> onErrorFunc)
: Reporter<std::vector<std::vector<unsigned char>>>(name, getReportUri, hold, interval, fetcher, onErrorFunc)
{
}
void UdpReporter::ReConnect()
{
if (this->s)
{
close(this->s);
}
std::string uri;
try
{
uri = this->getReportUri(this->cts.get_token());
}
catch (const http::http_exception& httpEx)
{
Logger::Warn("UdpReporter, HttpException occurred when {2} report to {0}, ex {1}", uri, httpEx.what(), this->name);
}
catch (const std::exception& ex)
{
Logger::Error("UdpReporter, Exception occurred when {2} report to {0}, ex {1}", uri, ex.what(), this->name);
}
catch (...)
{
Logger::Error("UdpReporter, Unknown error occurred when {1} report to {0}", uri, this->name);
}
if (!uri.empty())
{
auto tokens = String::Split(uri, '/');
auto endpoint = String::Split(tokens[2], ':');
auto server = endpoint[0];
auto port = endpoint[1];
addrinfo hints;
memset(&hints, 0, sizeof(hints));
hints.ai_family = AF_INET;
hints.ai_socktype = SOCK_DGRAM;
hints.ai_flags = 0;
hints.ai_protocol = 0;
addrinfo* siRemote, *current;
Logger::Info("UdpReporter, getaddrinfo server {0}, port {1}", server, port);
int ret = getaddrinfo(server.c_str(), port.c_str(), &hints, &siRemote);
if (ret != 0)
{
Logger::Error("UdpReporter, getaddrinfo failed {0}", gai_strerror(ret));
return;
}
bool success = false;
for (current = siRemote;
current != nullptr;
current = current->ai_next)
{
if ((this->s = socket(AF_INET, SOCK_DGRAM | SOCK_CLOEXEC, IPPROTO_UDP)) == -1)
{
Logger::Warn("UdpReporter, create socket failed with errno {0}", errno);
continue;
}
if (connect(this->s, current->ai_addr, current->ai_addrlen) != -1)
{
Logger::Info("UdpReporter, connect succeeds.");
success = true;
break;
}
Logger::Warn("UdpReporter, connect failed with errno {0}.", errno);
close(this->s);
}
freeaddrinfo(siRemote);
this->uri = uri;
this->initialized = success;
}
else
{
this->initialized = false;
}
}
UdpReporter::~UdpReporter()
{
close(this->s);
this->Stop();
}
int UdpReporter::Report()
{
if (!this->initialized)
{
this->ReConnect();
if (!this->initialized)
{
return -1;
}
}
auto dataSet = this->valueFetcher();
// std::vector<int> dataInt;
// std::transform(data.cbegin(), data.cend(), std::back_inserter(dataInt), [] (unsigned char c) { return c; });
// Logger::Debug("Report datasize {0}, data {1}", data.size(), String::Join<' '>(dataInt));
for (const auto& data : dataSet)
{
auto buffer = &data[0];
if (NodeManagerConfig::GetDebug())
{
std::vector<int> d;
d.assign(data.begin(), data.end());
Logger::Debug("UdpReporter, Udp packet sent: {0}", String::Join<','>(d));
}
int ret = write(this->s, buffer, data.size());
if (ret == -1)
{
Logger::Error(
"UdpReporter, Error when sendto {0}, socket {1}, errno {2}",
this->uri,
this->s,
errno);
this->initialized = false;
}
}
return 0;
}