LinuxNodeMgr/JobTaskDb.cpp (272 lines of code) (raw):

#include "JobTaskDb.h" #include "RemotingExecutor.h" void* MetricThread(void* arg) { std::cout << "Start report thread." << std::endl; pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL); while (true) { std::string metricUri = JobTaskDb::GetInstance().GetMetricReportUri(); if (!metricUri.empty()) { HandleJson::Metric(metricUri); } sleep(Monitoring::Interval); } pthread_exit(0); } void* ReportingThread(void* arg) { std::cout << "Start metric thread." << std::endl; pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL); while (true) { std::string reportUri = JobTaskDb::GetInstance().GetReportUri(); if (!reportUri.empty()) { HandleJson::Ping(reportUri); } sleep(30); } pthread_exit(0); } JobTaskDb::JobTaskDb() : executor(new Executor()) { // start monitoring service monitoring.Start(); pthread_mutex_init(&jobTaskDbLock, NULL); pthread_create(&(this->threadId), NULL, ReportingThread, NULL); pthread_create(&(this->metricThreadId), NULL, MetricThread, NULL); } JobTaskDb::~JobTaskDb() { if (executor != NULL) delete executor; // stop monitoring service monitoring.Stop(); pthread_mutex_destroy(&jobTaskDbLock); if (threadId != 0){ pthread_cancel(threadId); pthread_join(threadId, NULL); } if (metricThreadId != 0){ pthread_cancel(metricThreadId); pthread_join(metricThreadId, NULL); } } void JobTaskDb::SetReportUri(const std::string& reportUri) { pthread_mutex_lock(&jobTaskDbLock); this->reportUri = reportUri; this->metricReportUri = GetMetricReportUri(this->reportUri); std::ofstream uriFile("ReportUri", std::ios::trunc); uriFile << reportUri; std::cout << "ReportUri recorded: " << reportUri << std::endl; pthread_mutex_unlock(&jobTaskDbLock); } const std::string JobTaskDb::GetReportUri() { return this->reportUri; } const std::string JobTaskDb::GetMetricReportUri() { return this->metricReportUri; } std::string JobTaskDb::GetMetricReportUri(std::string & reportUri) { size_t found = reportUri.find_last_of('/'); if (found != std::string::npos){ std::string metricReportUri = reportUri.substr(0, found + 1) + "metricreported"; std::cout << "GetMetricReportUri : " << metricReportUri << std::endl; return metricReportUri; } return ""; } JobTaskDb* JobTaskDb::instance = NULL; json::value UMID::GetJson() const { json::value obj; obj[U("MetricId")] = json::value(this->MetricId); obj[U("InstanceId")] = json::value(this->InstanceId); return obj; } json::value ComputeNodeMetricInformation::GetJson() const { json::value obj; obj[U("Name")] = json::value::string(this->Name); obj[U("Time")] = json::value::string(this->Time); std::vector<json::value> arr1; std::vector<json::value> arr2; for (std::map<int, UMID*>::const_iterator it = this->Umids.begin(); it != this->Umids.end(); it++) { int no = it->first; arr1.push_back(it->second->GetJson()); arr2.push_back(json::value::number(this->Values.at(no))); } obj[U("Umids")] = json::value::array(arr1); obj[U("Values")] = json::value::array(arr2); obj[U("TickCount")] = json::value::number(this->TickCount); return obj; } json::value ComputeClusterTaskInformation::GetJson() const { json::value obj; obj[U("ExitCode")] = json::value::number(this->ExitCode); obj[U("Exited")] = json::value::boolean(this->Exited); obj[U("KernelProcessorTime")] = json::value((int64_t)this->KernelProcessorTime); obj[U("NumberOfProcesses")] = json::value::number(this->NumberOfProcesses); obj[U("PrimaryTask")] = json::value::boolean(this->PrimaryTask); obj[U("ProcessIds")] = json::value::string(this->ProcessIds); obj[U("TaskId")] = json::value::number(this->TaskId); obj[U("TaskRequeueCount")] = json::value::number(this->TaskRequeueCount); obj[U("UserProcessorTime")] = json::value((int64_t)this->UserProcessorTime); obj[U("WorkingSet")] = json::value::number(this->WorkingSet); obj[U("Message")] = json::value::string(this->Message); return obj; } json::value ComputeClusterTaskInformation::GetEventArgJson() const { json::value obj; obj[U("JobId")] = json::value::number(this->JobId); obj[U("TaskInfo")] = this->GetJson(); return obj; } json::value ComputeClusterJobInformation::GetJson() const { json::value obj; obj[U("JobId")] = json::value::number(this->JobId); std::vector<json::value> ans; for (std::map<int, ComputeClusterTaskInformation*>::const_iterator it = this->Tasks.begin(); it != this->Tasks.end(); it ++) { ans.push_back(it->second->GetJson()); } obj[U("Tasks")] = json::value::array(ans); return obj; } json::value ComputeClusterNodeInformation::GetJson() const { json::value obj; obj[U("Availability")] = json::value::number(this->Availability); obj[U("JustStarted")] = json::value::boolean(this->JustStarted); obj[U("MacAddress")] = json::value::string(this->MacAddress); obj[U("Name")] = json::value::string(this->Name); std::vector<json::value> ans; for (std::map<int, ComputeClusterJobInformation*>::const_iterator it = this->Jobs.begin(); it != this->Jobs.end(); it ++) { ans.push_back(it->second->GetJson()); } obj[U("Jobs")] = json::value::array(ans); return obj; } json::value JobTaskDb::GetNodeInfo() { pthread_mutex_lock(&jobTaskDbLock); json::value obj = this->nodeInfo.GetJson(); this->nodeInfo.JustStarted = false; pthread_mutex_unlock(&jobTaskDbLock); return obj; } json::value JobTaskDb::GetMetricInfo() { ComputeNodeMetricInformation cnmi(nodeInfo.Name); // add metric data UMID cpuUsage(1, 1); cnmi.Umids[0] = &cpuUsage; cnmi.Values[0] = monitoring.GetCpuUsage(); UMID memoryUsage(3, 0); cnmi.Umids[1] = &memoryUsage; cnmi.Values[1] = monitoring.GetAvailableMemory(); UMID networkUsage(12, 1); cnmi.Umids[2] = &networkUsage; cnmi.Values[2] = monitoring.GetNetworkUsage(); json::value obj = cnmi.GetJson(); return obj; } void JobTaskDb::StartJobAndTask(int jobId, int taskId, ProcessStartInfo* startInfo, const std::string& callbackUri) { std::cout << "Enter StartJobAndTask" << std::endl; pthread_mutex_lock(&jobTaskDbLock); std::cout << "GetLock" << std::endl; ComputeClusterJobInformation* jobInfo = NULL; std::map<int, ComputeClusterJobInformation*>::iterator it = this->nodeInfo.Jobs.find(jobId); std::cout << "JobInfoFound" << std::endl; if (it == this->nodeInfo.Jobs.end()) { jobInfo = new ComputeClusterJobInformation(jobId); this->nodeInfo.Jobs[jobId] = jobInfo; std::cout << "Job added" << std::endl; } else { jobInfo = it->second; } ComputeClusterTaskInformation* taskInfo; std::map<int, ComputeClusterTaskInformation*>::iterator taskIt = jobInfo->Tasks.find(taskId); std::cout << "TaskInfoFound" << std::endl; if (taskIt == jobInfo->Tasks.end()) { std::cout << "Before EXE startTask" << std::endl; taskInfo = this->executor->StartTask(jobId, taskId, startInfo, callbackUri); std::cout << "After Exe StartTask" << std::endl; jobInfo->Tasks[taskId] = taskInfo; } else { taskInfo = taskIt->second; } pthread_mutex_unlock(&jobTaskDbLock); } void JobTaskDb::EndJob(int jobId) { pthread_mutex_lock(&jobTaskDbLock); ComputeClusterJobInformation* jobInfo = NULL; std::map<int, ComputeClusterJobInformation*>::iterator it = this->nodeInfo.Jobs.find(jobId); if (it != this->nodeInfo.Jobs.end()) { jobInfo = it->second; for (std::map<int, ComputeClusterTaskInformation*>::iterator taskIt = jobInfo->Tasks.begin(); taskIt != jobInfo->Tasks.end(); taskIt++) { this->executor->EndTask(taskIt->second); } delete jobInfo; this->nodeInfo.Jobs.erase(jobId); } pthread_mutex_unlock(&jobTaskDbLock); } void JobTaskDb::EndTask(int jobId, int taskId) { pthread_mutex_lock(&jobTaskDbLock); ComputeClusterJobInformation* jobInfo = NULL; std::map<int, ComputeClusterJobInformation*>::iterator it = this->nodeInfo.Jobs.find(jobId); if (it != this->nodeInfo.Jobs.end()) { jobInfo = it->second; std::map<int, ComputeClusterTaskInformation*>::iterator taskIt = jobInfo->Tasks.find(taskId); if (taskIt != jobInfo->Tasks.end()) { this->executor->EndTask(taskIt->second); } } pthread_mutex_unlock(&jobTaskDbLock); } void JobTaskDb::RemoveTask(int jobId, int taskId) { pthread_mutex_lock(&jobTaskDbLock); ComputeClusterJobInformation* jobInfo = NULL; std::map<int, ComputeClusterJobInformation*>::iterator it = this->nodeInfo.Jobs.find(jobId); if (it != this->nodeInfo.Jobs.end()) { jobInfo = it->second; std::map<int, ComputeClusterTaskInformation*>::iterator taskIt = jobInfo->Tasks.find(taskId); if (taskIt != jobInfo->Tasks.end()) { jobInfo->Tasks.erase(taskId); } } pthread_mutex_unlock(&jobTaskDbLock); }