nodemanager/core/RemoteExecutor.h (69 lines of code) (raw):
#ifndef REMOTEEXECUTOR_H
#define REMOTEEXECUTOR_H
#include <set>
#include <map>
#include "IRemoteExecutor.h"
#include "JobTaskTable.h"
#include "Monitor.h"
#include "Process.h"
#include "Reporter.h"
#include "HostsManager.h"
#include "../arguments/MetricCountersConfig.h"
#include "../data/ProcessStatistics.h"
namespace hpc
{
namespace core
{
class RemoteExecutor : public IRemoteExecutor
{
public:
RemoteExecutor(const std::string& networkName);
~RemoteExecutor()
{
Logger::Info("Closing the Remote Executor.");
this->cts.cancel();
pthread_rwlock_destroy(&this->lock);
Logger::Info("Closed the Remote Executor.");
}
virtual pplx::task<web::json::value> StartJobAndTask(hpc::arguments::StartJobAndTaskArgs&& args, std::string&& callbackUri);
virtual pplx::task<web::json::value> StartTask(hpc::arguments::StartTaskArgs&& args, std::string&& callbackUri);
virtual pplx::task<web::json::value> EndJob(hpc::arguments::EndJobArgs&& args);
virtual pplx::task<web::json::value> EndTask(hpc::arguments::EndTaskArgs&& args, std::string&& callbackUri);
virtual pplx::task<web::json::value> Ping(std::string&& callbackUri);
virtual pplx::task<web::json::value> Metric(std::string&& callbackUri);
virtual pplx::task<web::json::value> MetricConfig(hpc::arguments::MetricCountersConfig&& config, std::string&& callbackUri);
virtual pplx::task<web::json::value> PeekTaskOutput(hpc::arguments::PeekTaskOutputArgs&& args);
protected:
private:
static void* GracePeriodElapsed(void* data);
void StartRegister();
void StartHeartbeat();
void UpdateStatistics();
void StartMetric();
void StartHostsManager();
void ResyncAndInvalidateCache();
const hpc::data::ProcessStatistics* TerminateTask(
int jobId, int taskId, int requeueCount,
uint64_t processKey, int exitCode, bool forced, bool mpiDockerTask);
void ReportTaskCompletion(int jobId, int taskId, int taskRequeueCount, json::value jsonBody, const std::string& callbackUri);
const int UnknowId = 999;
const int NodeInfoReportInterval = 30;
const int MetricReportInterval = 1;
const int RegisterInterval = 300;
const int DefaultHostsFetchInterval = 300;
const int MinHostsFetchInterval = 30;
JobTaskTable jobTaskTable;
Monitor monitor;
std::unique_ptr<Reporter<json::value>> nodeInfoReporter;
std::unique_ptr<Reporter<json::value>> registerReporter;
std::unique_ptr<Reporter<std::vector<std::vector<unsigned char>>>> metricReporter;
std::unique_ptr<HostsManager> hostsManager;
std::map<uint64_t, std::shared_ptr<Process>> processes;
std::map<int, std::tuple<std::string, bool, bool, bool, bool, std::string>> jobUsers;
std::map<std::string, std::set<int>> userJobs;
pthread_rwlock_t lock;
pplx::cancellation_token_source cts;
};
}
}
#endif // REMOTEEXECUTOR_H