nodemanager/core/Reporter.h (92 lines of code) (raw):
#ifndef REPORTER_H
#define REPORTER_H
#include <cpprest/json.h>
#include <functional>
#include <cmath>
#include "../utils/Logger.h"
#include "NamingClient.h"
using namespace hpc::utils;
namespace hpc
{
namespace core
{
template<typename ReportType>
class Reporter
{
public:
Reporter(std::string reporterName, std::function<std::string(pplx::cancellation_token)> getUri, int hold, int interval, std::function<ReportType()> fetcher, std::function<void(int)> onErrorFunc, int retryFactor = 1)
: name(reporterName), getReportUri(getUri), valueFetcher(fetcher), onError(onErrorFunc), intervalSeconds(interval), holdSeconds(hold), errorRetryMultiplyFactor(retryFactor)
{
}
void Start()
{
if (this->getReportUri)
{
pthread_create(&this->threadId, nullptr, ReportingThread, this);
Logger::Debug("Started the thread {0} for Reporter {1}", this->threadId, this->name);
}
}
void Stop()
{
Logger::Debug("Stopping the thread {0} for Reporter {1}", this->threadId, this->name);
this->isRunning = false;
this->cts.cancel();
if (this->threadId != 0)
{
while (this->inRequest) usleep(1);
pthread_join(this->threadId, nullptr);
Logger::Debug("Stopped the thread {0} for Reporter {1}", this->threadId, this->name);
}
}
virtual ~Reporter()
{
Logger::Debug("Destructed Reporter {0}", this->name);
}
virtual int Report() = 0;
protected:
std::string name;
std::function<std::string(pplx::cancellation_token)> getReportUri;
std::function<ReportType()> valueFetcher;
std::function<void(int)> onError;
int intervalSeconds;
pplx::cancellation_token_source cts;
private:
static void* ReportingThread(void* arg)
{
Reporter* r = static_cast<Reporter*>(arg);
sleep(r->holdSeconds);
while (r->isRunning)
{
bool needRetry = false;
if (r->getReportUri)
{
r->inRequest = true;
if ((needRetry = (0 != r->Report())))
{
if (r->onError)
{
r->onError(r->retryCount++);
}
}
else
{
r->retryCount = 0;
}
r->inRequest = false;
}
int retrySeconds = r->ErrorRetrySecondsInit * pow(r->errorRetryMultiplyFactor, r->retryCount);
if (r->isRunning) sleep(needRetry && retrySeconds > 0 && retrySeconds < r->intervalSeconds ? retrySeconds : r->intervalSeconds);
}
return nullptr;
}
const int ErrorRetrySecondsInit = 2;
int holdSeconds;
int errorRetryMultiplyFactor;
int retryCount = 0;
pthread_t threadId = 0;
bool isRunning = true;
bool inRequest = false;
};
}
}
#endif // REPORTER_H