nodemanager/core/JobTaskTable.cpp (146 lines of code) (raw):
#include "JobTaskTable.h"
#include "../utils/WriterLock.h"
#include "../utils/ReaderLock.h"
#include "../utils/System.h"
#include <math.h>
using namespace hpc::core;
using namespace web;
using namespace hpc::data;
using namespace hpc::utils;
JobTaskTable* JobTaskTable::instance = nullptr;
json::value JobTaskTable::ToJson()
{
ReaderLock readerLock(&this->lock);
auto j = this->nodeInfo.ToJson();
return std::move(j);
}
int JobTaskTable::GetTaskCount()
{
ReaderLock readerLock(&this->lock);
int taskCount = 0;
for_each(this->nodeInfo.Jobs.begin(), this->nodeInfo.Jobs.end(),
[&taskCount] (auto& i) { taskCount += i.second->Tasks.size(); });
return taskCount;
}
int JobTaskTable::GetCoresInUse()
{
ReaderLock readerLock(&this->lock);
bool allUsed = false;
int cores, sockets;
System::CPU(cores, sockets);
std::vector<uint64_t> coresMask(ceil((float)cores / 64), 0);
for_each(this->nodeInfo.Jobs.begin(), this->nodeInfo.Jobs.end(), [&coresMask, &allUsed] (auto& job)
{
for_each(job.second->Tasks.begin(), job.second->Tasks.end(), [&coresMask, &allUsed] (auto& task)
{
if (task.second->Affinity.empty())
{
allUsed = true;
}
else
{
for (size_t i = 0; i < coresMask.size() && i < task.second->Affinity.size(); i++)
{
coresMask[i] |= task.second->Affinity[i];
}
}
});
});
if (allUsed)
{
return cores;
}
int used = 0;
uint64_t bit = 1;
for (int i = 0; i < cores; i++)
{
if (coresMask[i / 64] & (bit << i % 64))
{
used++;
}
}
return used;
}
std::shared_ptr<TaskInfo> JobTaskTable::AddJobAndTask(int jobId, int taskId, bool& isNewEntry)
{
WriterLock writerLock(&this->lock);
std::shared_ptr<JobInfo> job;
auto j = this->nodeInfo.Jobs.find(jobId);
if (j == this->nodeInfo.Jobs.end())
{
job = std::shared_ptr<JobInfo>(new JobInfo(jobId));
this->nodeInfo.Jobs[jobId] = job;
}
else
{
job = j->second;
}
std::shared_ptr<TaskInfo> task;
auto t = job->Tasks.find(taskId);
if (t == job->Tasks.end())
{
task = std::shared_ptr<TaskInfo>(new TaskInfo(jobId, taskId, nodeInfo.Name));
job->Tasks[taskId] = task;
isNewEntry = true;
}
else
{
task = t->second;
isNewEntry = false;
}
return task;
}
std::shared_ptr<TaskInfo> JobTaskTable::GetTask(int jobId, int taskId)
{
ReaderLock readerLock(&this->lock);
std::shared_ptr<TaskInfo> task;
auto j = this->nodeInfo.Jobs.find(jobId);
if (j != this->nodeInfo.Jobs.end())
{
auto t = j->second->Tasks.find(taskId);
if (t != j->second->Tasks.end())
{
task = t->second;
}
}
return task;
}
std::vector<std::shared_ptr<TaskInfo>> JobTaskTable::GetAllTasks()
{
ReaderLock readerLock(&this->lock);
std::vector<std::shared_ptr<TaskInfo>> tasks;
for (const auto& job : this->nodeInfo.Jobs)
{
for (const auto& task : job.second->Tasks)
{
tasks.push_back(task.second);
}
}
return std::move(tasks);
}
std::shared_ptr<JobInfo> JobTaskTable::RemoveJob(int jobId)
{
WriterLock writerLock(&this->lock);
std::shared_ptr<JobInfo> job;
auto j = this->nodeInfo.Jobs.find(jobId);
if (j != this->nodeInfo.Jobs.end())
{
job = j->second;
this->nodeInfo.Jobs.erase(j);
}
return job;
}
void JobTaskTable::RemoveTask(int jobId, int taskId, uint64_t attemptId)
{
WriterLock writerLock(&this->lock);
auto j = this->nodeInfo.Jobs.find(jobId);
if (j != this->nodeInfo.Jobs.end())
{
auto t = j->second->Tasks.find(taskId);
// only erase when attempt ID matches.
if (t != j->second->Tasks.end() &&
t->second->GetAttemptId() == attemptId)
{
j->second->Tasks.erase(t);
}
}
}