nodemanager/core/RemoteExecutor.cpp (685 lines of code) (raw):
#include <cpprest/http_client.h>
#include <memory>
#include <boost/uuid/string_generator.hpp>
#include <boost/uuid/uuid_io.hpp>
#include <boost/algorithm/string.hpp>
#include <boost/algorithm/string/replace.hpp>
#include "RemoteExecutor.h"
#include "HttpReporter.h"
#include "UdpReporter.h"
#include "../utils/WriterLock.h"
#include "../utils/ReaderLock.h"
#include "../utils/Logger.h"
#include "../utils/System.h"
#include "../common/ErrorCodes.h"
#include "../data/ProcessStatistics.h"
#include "NodeManagerConfig.h"
#include "HttpHelper.h"
using namespace web::http;
using namespace web;
using namespace hpc::core;
using namespace hpc::utils;
using namespace hpc::arguments;
using namespace hpc::data;
using namespace hpc::common;
RemoteExecutor::RemoteExecutor(const std::string& networkName)
: monitor(System::GetNodeName(), networkName, MetricReportInterval), lock(PTHREAD_RWLOCK_INITIALIZER)
{
this->StartRegister();
this->StartHeartbeat();
this->StartMetric();
this->StartHostsManager();
}
pplx::task<json::value> RemoteExecutor::StartJobAndTask(StartJobAndTaskArgs&& args, std::string&& callbackUri)
{
{
WriterLock writerLock(&this->lock);
const auto& envi = args.StartInfo.EnvironmentVariables;
auto isAdminIt = envi.find("CCP_ISADMIN");
bool isAdmin = isAdminIt != envi.end() && isAdminIt->second == "1";
auto mapAdminUserIt = envi.find("CCP_MAP_ADMIN_USER");
bool mapAdminUser = mapAdminUserIt != envi.end() && mapAdminUserIt->second == "1";
const std::string WindowsSystemUser = "NT AUTHORITY\\SYSTEM";
bool mapAdminToRoot = isAdmin && !mapAdminUser;
bool mapAdminToUser = isAdmin && mapAdminUser;
bool isWindowsSystemAccount = boost::iequals(args.UserName, WindowsSystemUser);
std::string userName;
bool existed;
// Use root user in 3 scenarios:
// 1. This is old image, username is empty, we use root.
// 2. User is Windows or HPC Administrator and CCP_MAP_ADMIN_USER is not set
// 3. User is Windows local system account, which is mapped to Linux root user.
if (args.UserName.empty() || mapAdminToRoot || isWindowsSystemAccount)
{
userName = "root";
existed = true;
}
else
{
auto preserveDomainIt = envi.find("CCP_PRESERVE_DOMAIN");
bool preserveDomain = preserveDomainIt != envi.end() && preserveDomainIt->second == "1";
userName = preserveDomain ? args.UserName : String::GetUserName(args.UserName);
if (userName == "root") { userName = "hpc_faked_root"; }
int ret = System::CreateUser(userName, args.Password, isAdmin);
existed = ret == 9;
if (ret != 0 && ret != 9)
{
throw std::runtime_error(
String::Join(" ", "Create user", userName, "failed with error code", ret));
}
Logger::Debug(args.JobId, args.TaskId, this->UnknowId, "Create user {0} return code: {1}.", userName, ret);
}
bool privateKeyAdded = false;
bool publicKeyAdded = false;
bool authKeyAdded = false;
// Set SSH keys in 3 scenarios:
// 1. User is not a Windows or HPC Administrator.
// 2. User is Windows or HPC Administrator and it is mapped to non-root user in Linux.
// 3. User is Windows local system account, which is mapped to Linux root user.
if (!isAdmin || mapAdminToUser || isWindowsSystemAccount)
{
std::string privateKeyFile;
privateKeyAdded = 0 == System::AddSshKey(userName, args.PrivateKey, "id_rsa", "600", privateKeyFile);
if (privateKeyAdded && args.PublicKey.empty())
{
int ret = System::ExecuteCommandOut(args.PublicKey, "ssh-keygen -y -f ", privateKeyFile);
if (ret != 0)
{
Logger::Error(args.JobId, args.TaskId, this->UnknowId, "Retrieve public key failed with exitcode {0}.", ret);
}
}
std::string publicKeyFile;
publicKeyAdded = privateKeyAdded && (0 == System::AddSshKey(userName, args.PublicKey, "id_rsa.pub", "644", publicKeyFile));
std::string userAuthKeyFile;
authKeyAdded = privateKeyAdded && publicKeyAdded && (0 == System::AddAuthorizedKey(userName, args.PublicKey, "600", userAuthKeyFile));
Logger::Debug(args.JobId, args.TaskId, this->UnknowId,
"Add ssh key for user {0} result: private {1}, public {2}, auth {3}",
userName, privateKeyAdded, publicKeyAdded, authKeyAdded);
}
if (this->jobUsers.find(args.JobId) == this->jobUsers.end())
{
Logger::Debug(args.JobId, args.TaskId, this->UnknowId,
"Create user: jobUsers entry added.");
this->jobUsers[args.JobId] =
std::tuple<std::string, bool, bool, bool, bool, std::string>(userName, existed, privateKeyAdded, publicKeyAdded, authKeyAdded, args.PublicKey);
}
auto it = this->userJobs.find(userName);
if (it != this->userJobs.end())
{
it->second.insert(args.JobId);
}
else
{
this->userJobs[userName] = { args.JobId };
}
}
return this->StartTask(StartTaskArgs(args.JobId, args.TaskId, std::move(args.StartInfo)), std::move(callbackUri));
}
pplx::task<json::value> RemoteExecutor::StartTask(StartTaskArgs&& args, std::string&& callbackUri)
{
WriterLock writerLock(&this->lock);
bool isNewEntry;
std::shared_ptr<TaskInfo> taskInfo = this->jobTaskTable.AddJobAndTask(args.JobId, args.TaskId, isNewEntry);
taskInfo->Affinity = args.StartInfo.Affinity;
taskInfo->SetTaskRequeueCount(args.StartInfo.TaskRequeueCount);
std::string userName = "root";
auto jobUser = this->jobUsers.find(args.JobId);
if (jobUser == this->jobUsers.end())
{
this->jobTaskTable.RemoveJob(args.JobId);
throw std::runtime_error(String::Join(" ", "Job", args.JobId, "was not started on this node."));
}
else
{
userName = std::get<0>(jobUser->second);
}
if (args.StartInfo.CommandLine.empty())
{
Logger::Info(args.JobId, args.TaskId, args.StartInfo.TaskRequeueCount, "MPI non-master task found, skip creating the process.");
std::string dockerImage = args.StartInfo.EnvironmentVariables["CCP_DOCKER_IMAGE"];
std::string isNvidiaDocker = args.StartInfo.EnvironmentVariables["CCP_DOCKER_NVIDIA"];
std::string additionalOption = args.StartInfo.EnvironmentVariables["CCP_DOCKER_START_OPTION"];
std::string skipSshSetup = args.StartInfo.EnvironmentVariables["CCP_DOCKER_SKIP_SSH_SETUP"];
if (!dockerImage.empty())
{
taskInfo->IsPrimaryTask = false;
std::string output;
dockerImage = String::Join(dockerImage, "\"", "\"");
isNvidiaDocker = String::Join(isNvidiaDocker, "\"", "\"");
boost::replace_all(additionalOption, "\"", "\\\"");
additionalOption = String::Join(additionalOption, "\"", "\"");
skipSshSetup = String::Join(skipSshSetup, "\"", "\"");
int ret = System::ExecuteCommandOut(output, "/bin/bash 2>&1", "StartMpiContainer.sh", taskInfo->TaskId, userName, dockerImage, isNvidiaDocker, additionalOption, skipSshSetup);
if (ret == 0)
{
Logger::Info(taskInfo->JobId, taskInfo->TaskId, taskInfo->GetTaskRequeueCount(), "Start MPI container successfully.");
}
else
{
Logger::Error(taskInfo->JobId, taskInfo->TaskId, taskInfo->GetTaskRequeueCount(), "Start MPI container failed with exitcode {0}. {1}", ret, output);
}
}
}
else
{
if (this->processes.find(taskInfo->ProcessKey) == this->processes.end() &&
isNewEntry)
{
auto process = std::shared_ptr<Process>(new Process(
taskInfo->JobId,
taskInfo->TaskId,
taskInfo->GetTaskRequeueCount(),
"Task",
std::move(args.StartInfo.CommandLine),
std::move(args.StartInfo.StdOutFile),
std::move(args.StartInfo.StdErrFile),
std::move(args.StartInfo.StdInFile),
std::move(args.StartInfo.WorkDirectory),
userName,
true,
std::move(args.StartInfo.Affinity),
std::move(args.StartInfo.EnvironmentVariables),
[taskInfo, uri = std::move(callbackUri), this] (
int exitCode,
std::string&& message,
const ProcessStatistics& stat)
{
try
{
json::value jsonBody;
taskInfo->CancelGracefulThread();
{
WriterLock writerLock(&this->lock);
if (taskInfo->Exited)
{
Logger::Debug(taskInfo->JobId, taskInfo->TaskId, taskInfo->GetTaskRequeueCount(),
"Ended already by EndTask.");
}
else
{
taskInfo->Exited = true;
taskInfo->ExitCode = exitCode;
taskInfo->Message = std::move(message);
taskInfo->AssignFromStat(stat);
jsonBody = taskInfo->ToCompletionEventArgJson();
}
}
this->ReportTaskCompletion(taskInfo->JobId, taskInfo->TaskId,
taskInfo->GetTaskRequeueCount(), jsonBody, uri);
// this won't remove the task entry added later as attempt id doesn't match
this->jobTaskTable.RemoveTask(taskInfo->JobId, taskInfo->TaskId, taskInfo->GetAttemptId());
}
catch (const std::exception& ex)
{
Logger::Error(taskInfo->JobId, taskInfo->TaskId, taskInfo->GetTaskRequeueCount(),
"Exception when sending back task result. {0}", ex.what());
}
Logger::Debug(taskInfo->JobId, taskInfo->TaskId, taskInfo->GetTaskRequeueCount(),
"attemptId {0}, processKey {1}, erasing process", taskInfo->GetAttemptId(), taskInfo->ProcessKey);
{
WriterLock writerLock(&this->lock);
// Process will be deleted here.
this->processes.erase(taskInfo->ProcessKey);
}
}));
this->processes[taskInfo->ProcessKey] = process;
Logger::Debug(
args.JobId, args.TaskId, taskInfo->GetTaskRequeueCount(),
"StartTask for ProcessKey {0}, process count {1}", taskInfo->ProcessKey, this->processes.size());
process->Start(process).then([this, taskInfo] (std::pair<pid_t, pthread_t> ids)
{
if (ids.first > 0)
{
Logger::Debug(taskInfo->JobId, taskInfo->TaskId, taskInfo->GetTaskRequeueCount(),
"Process started pid {0}, tid {1}", ids.first, ids.second);
}
});
}
else
{
Logger::Warn(taskInfo->JobId, taskInfo->TaskId, taskInfo->GetTaskRequeueCount(),
"The task has started already.");
}
}
return pplx::task_from_result(json::value());
}
pplx::task<json::value> RemoteExecutor::EndJob(hpc::arguments::EndJobArgs&& args)
{
WriterLock writerLock(&this->lock);
Logger::Info(args.JobId, this->UnknowId, this->UnknowId, "EndJob: starting");
auto jobInfo = this->jobTaskTable.RemoveJob(args.JobId);
json::value jsonBody;
if (jobInfo)
{
for (auto& taskPair : jobInfo->Tasks)
{
auto taskInfo = taskPair.second;
if (taskInfo)
{
const auto* stat = this->TerminateTask(
args.JobId, taskPair.first, taskInfo->GetTaskRequeueCount(),
taskInfo->ProcessKey, (int)ErrorCodes::EndJobExitCode, true, !taskInfo->IsPrimaryTask);
Logger::Debug(args.JobId, taskPair.first, taskInfo->GetTaskRequeueCount(), "EndJob: Terminating task");
if (stat != nullptr)
{
taskInfo->Exited = stat->IsTerminated();
taskInfo->ExitCode = (int)ErrorCodes::EndJobExitCode;
taskInfo->AssignFromStat(*stat);
taskInfo->CancelGracefulThread();
}
}
else
{
Logger::Warn(args.JobId, taskPair.first, this->UnknowId,
"EndJob: Task is already finished");
assert(false);
}
}
jsonBody = jobInfo->ToJson();
Logger::Info(args.JobId, this->UnknowId, this->UnknowId, "EndJob: ended {0}", jsonBody);
}
else
{
Logger::Warn(args.JobId, this->UnknowId, this->UnknowId, "EndJob: Job is already finished");
}
auto jobUser = this->jobUsers.find(args.JobId);
if (jobUser != this->jobUsers.end())
{
Logger::Info(args.JobId, this->UnknowId, this->UnknowId, "EndJob: Cleanup user {0}", std::get<0>(jobUser->second));
auto userJob = this->userJobs.find(std::get<0>(jobUser->second));
bool cleanupUser = false;
if (userJob == this->userJobs.end())
{
cleanupUser = true;
}
else
{
userJob->second.erase(args.JobId);
// cleanup when no one is using the user;
cleanupUser = userJob->second.empty();
Logger::Info(args.JobId, this->UnknowId, this->UnknowId,
"EndJob: {0} jobs associated with the user {1}", userJob->second.size(), std::get<0>(jobUser->second));
if (cleanupUser)
{
this->userJobs.erase(userJob);
}
}
if (cleanupUser)
{
std::string userName, publicKey;
bool existed, privateKeyAdded, publicKeyAdded, authKeyAdded;
std::tie(userName, existed, privateKeyAdded, publicKeyAdded, authKeyAdded, publicKey) = jobUser->second;
// the existed could be true for the later job, so the user will be left
// on the node, which is by design.
// we just have this delete user logic for a simple way of cleanup.
// if delete user failed, cleanup keys as necessary.
bool cleanupKeys = true;
// if (!existed)
// {
// if (!userName.empty())
// {
// Logger::Info(args.JobId, this->UnknowId, this->UnknowId,
// "EndJob: Delete user {0}", userName);
//
// cleanupKeys = 0 != System::DeleteUser(userName);
// }
// }
if (cleanupKeys)
{
if (privateKeyAdded)
{
Logger::Info(args.JobId, this->UnknowId, this->UnknowId,
"EndJob: RemoveSshKey id_rsa: {0}", userName);
System::RemoveSshKey(userName, "id_rsa");
}
if (publicKeyAdded)
{
Logger::Info(args.JobId, this->UnknowId, this->UnknowId,
"EndJob: RemoveSshKey id_rsa.pub: {0}", userName);
System::RemoveSshKey(userName, "id_rsa.pub");
}
if (authKeyAdded)
{
Logger::Info(args.JobId, this->UnknowId, this->UnknowId,
"EndJob: RemoveAuthorizedKey {0}", userName);
System::RemoveAuthorizedKey(userName, publicKey);
}
}
}
this->jobUsers.erase(jobUser);
}
return pplx::task_from_result(jsonBody);
}
pplx::task<json::value> RemoteExecutor::EndTask(hpc::arguments::EndTaskArgs&& args, std::string&& callbackUri)
{
ReaderLock readerLock(&this->lock);
Logger::Info(args.JobId, args.TaskId, this->UnknowId, "EndTask: starting");
auto taskInfo = this->jobTaskTable.GetTask(args.JobId, args.TaskId);
json::value jsonBody;
if (taskInfo)
{
Logger::Debug(
args.JobId, args.TaskId, taskInfo->GetTaskRequeueCount(),
"EndTask for ProcessKey {0}, processes count {1}",
taskInfo->ProcessKey, this->processes.size());
const auto* stat = this->TerminateTask(
args.JobId, args.TaskId, taskInfo->GetTaskRequeueCount(),
taskInfo->ProcessKey,
(int)ErrorCodes::EndTaskExitCode,
args.TaskCancelGracePeriodSeconds == 0,
!taskInfo->IsPrimaryTask);
taskInfo->ExitCode = (int)ErrorCodes::EndTaskExitCode;
if (stat == nullptr || stat->IsTerminated())
{
this->jobTaskTable.RemoveTask(taskInfo->JobId, taskInfo->TaskId, taskInfo->GetAttemptId());
taskInfo->Exited = true;
taskInfo->CancelGracefulThread();
if (stat != nullptr)
{
taskInfo->AssignFromStat(*stat);
}
}
else
{
taskInfo->Exited = false;
taskInfo->AssignFromStat(*stat);
// start a thread to kill the task after a period of time;
auto* taskIds = new std::tuple<int, int, int, uint64_t, std::string, int, RemoteExecutor*>(
taskInfo->JobId, taskInfo->TaskId, taskInfo->GetTaskRequeueCount(), taskInfo->ProcessKey,
callbackUri, args.TaskCancelGracePeriodSeconds, this);
pthread_create(&taskInfo->GracefulThreadId, nullptr, RemoteExecutor::GracePeriodElapsed, taskIds);
}
jsonBody = taskInfo->ToJson();
Logger::Info(args.JobId, args.TaskId, this->UnknowId, "EndTask: ended {0}", jsonBody);
}
else
{
Logger::Warn(args.JobId, args.TaskId, this->UnknowId, "EndTask: Task is already finished");
}
return pplx::task_from_result(jsonBody);
}
void* RemoteExecutor::GracePeriodElapsed(void* data)
{
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, nullptr);
pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, nullptr);
auto* ids = static_cast<std::tuple<int, int, int, uint64_t, std::string, int, RemoteExecutor*>*>(data);
int jobId, taskId, requeueCount, period;
uint64_t processKey;
std::string callbackUri;
RemoteExecutor* e;
std::tie(jobId, taskId, requeueCount, processKey, callbackUri, period, e) = *ids;
delete ids;
sleep(period);
WriterLock writerLock(&e->lock);
Logger::Info(jobId, taskId, e->UnknowId, "GracePeriodElapsed: starting");
auto taskInfo = e->jobTaskTable.GetTask(jobId, taskId);
if (taskInfo)
{
const auto* stat = e->TerminateTask(
jobId, taskId, requeueCount,
processKey,
(int)ErrorCodes::EndTaskExitCode,
true,
false);
if (stat != nullptr)
{
Logger::Debug(jobId, taskId, requeueCount, "remaining pids size {0}", stat->ProcessIds.size());
if (NodeManagerConfig::GetDebug())
{
for (int pid : stat->ProcessIds)
{
std::string process;
std::string groupFile = "/sys/fs/cgroup/cpu,cpuacct/nmgroup_";
groupFile = String::Join("", groupFile, "Task_", taskId, "_", requeueCount, "/tasks");
System::ExecuteCommandOut(process, "ps -p", pid);
Logger::Debug(jobId, taskId, requeueCount, "undead process {1}, {0}", process, pid);
System::ExecuteCommandOut(process, "cat", groupFile);
Logger::Debug(jobId, taskId, requeueCount, "tasks file {0}", process);
}
}
// stat == nullptr means the processKey is already removed from the map
// which means the main task has exited already.
taskInfo->Exited = true;
taskInfo->ExitCode = (int)ErrorCodes::EndTaskExitCode;
taskInfo->AssignFromStat(*stat);
taskInfo->ProcessIds.clear();
e->jobTaskTable.RemoveTask(taskInfo->JobId, taskInfo->TaskId, taskInfo->GetAttemptId());
json::value jsonBody = taskInfo->ToCompletionEventArgJson();
Logger::Info(jobId, taskId, e->UnknowId, "EndTask: ended {0}", jsonBody);
e->ReportTaskCompletion(jobId, taskId, requeueCount, jsonBody, callbackUri);
}
}
else
{
Logger::Warn(jobId, taskId, e->UnknowId, "EndTask: Task is already finished");
}
pthread_exit(nullptr);
}
void RemoteExecutor::ReportTaskCompletion(
int jobId, int taskId, int taskRequeueCount, json::value jsonBody,
const std::string& callbackUri)
{
try
{
if (!jsonBody.is_null())
{
std::string uri = NodeManagerConfig::ResolveTaskCompletedUri(callbackUri, this->cts.get_token());
Logger::Debug(jobId, taskId, taskRequeueCount,
"Callback to {0} with {1}", uri, jsonBody);
auto client = HttpHelper::GetHttpClient(uri);
auto request = HttpHelper::GetHttpRequest(methods::POST, jsonBody);
client->request(*request).then([jobId, taskId, taskRequeueCount, uri, this](pplx::task<http_response> t)
{
try
{
auto response = t.get();
Logger::Info(jobId, taskId, taskRequeueCount,
"Callback to {0} response code {1}", uri, response.status_code());
if (response.status_code() != status_codes::OK)
{
this->ResyncAndInvalidateCache();
}
}
catch (const std::exception& ex)
{
this->ResyncAndInvalidateCache();
Logger::Error(jobId, taskId, taskRequeueCount,
"Exception when sending back task result. {0}", ex.what());
}
});
}
}
catch (const std::exception& ex)
{
this->ResyncAndInvalidateCache();
Logger::Error(jobId, taskId, taskRequeueCount,
"Exception when sending back task result. {0}", ex.what());
}
}
void RemoteExecutor::StartHeartbeat()
{
WriterLock writerLock(&this->lock);
this->nodeInfoReporter =
std::unique_ptr<Reporter<json::value>>(
new HttpReporter(
"HeartbeatReporter",
[](pplx::cancellation_token token) { return NodeManagerConfig::ResolveHeartbeatUri(token); },
0,
this->NodeInfoReportInterval,
[this]() { this->UpdateStatistics(); return this->jobTaskTable.ToJson(); },
[this](int retryCount) {
NamingClient::InvalidateCache();
if (retryCount > 2)
{
this->jobTaskTable.RequestResync();
}
}));
this->nodeInfoReporter->Start();
}
void RemoteExecutor::UpdateStatistics()
{
Logger::Info("Update tasks' statistics.");
auto* table = JobTaskTable::GetInstance();
if (table != nullptr)
{
auto tasks = table->GetAllTasks();
for (const auto& taskInfo : tasks)
{
ReaderLock readerLock(&this->lock);
auto p = this->processes.find(taskInfo->ProcessKey);
if (p != this->processes.end())
{
taskInfo->AssignFromStat(p->second->GetStatisticsFromCGroup());
}
else
{
Logger::Warn(taskInfo->JobId, taskInfo->TaskId, taskInfo->GetTaskRequeueCount(), "No process object found when updating task statistics.");
}
}
}
}
void RemoteExecutor::StartHostsManager()
{
std::string hostsUri = NodeManagerConfig::GetHostsFileUri();
if (!hostsUri.empty())
{
int interval = this->DefaultHostsFetchInterval;
try
{
interval = NodeManagerConfig::GetHostsFetchInterval();
}
catch (...)
{
// The Hosts Fetch interval may be not specified, just use the default interval in this case.
Logger::Info("HostsFetchInterval not specified or invalid, use the default interval {0} seconds.", interval);
}
if (interval < MinHostsFetchInterval)
{
Logger::Info("HostsFetchInterval {0} is less than minimum interval {1}, use the minimum interval.", interval, MinHostsFetchInterval);
interval = MinHostsFetchInterval;
}
WriterLock writerLock(&this->lock);
this->hostsManager = std::unique_ptr<HostsManager>(new HostsManager([](pplx::cancellation_token token) { return NodeManagerConfig::ResolveHostsFileUri(token); }, interval));
this->hostsManager->Start();
}
else
{
Logger::Warn("HostsFileUri not specified, hosts manager will not be started.");
}
}
void RemoteExecutor::StartRegister()
{
WriterLock writerLock(&this->lock);
this->registerReporter =
std::unique_ptr<Reporter<json::value>>(
new HttpReporter(
"RegisterReporter",
[](pplx::cancellation_token token) { return NodeManagerConfig::ResolveRegisterUri(token); },
3,
this->RegisterInterval,
[this]() { return this->monitor.GetRegisterInfo(); },
[this](int retryCount) {
NamingClient::InvalidateCache();
if (retryCount > 2)
{
this->jobTaskTable.RequestResync();
}
}));
this->registerReporter->Start();
}
pplx::task<json::value> RemoteExecutor::Ping(std::string&& callbackUri)
{
auto uri = NodeManagerConfig::GetHeartbeatUri();
if (uri != callbackUri)
{
NodeManagerConfig::SaveHeartbeatUri(callbackUri);
this->StartHeartbeat();
}
return pplx::task_from_result(json::value());
}
void RemoteExecutor::StartMetric()
{
WriterLock writerLock(&this->lock);
std::string uri = NodeManagerConfig::GetMetricUri();
if (!uri.empty())
{
auto tokens = String::Split(uri, '/');
uuid id = string_generator()(tokens[4]);
this->monitor.SetNodeUuid(id);
this->metricReporter =
std::unique_ptr<Reporter<std::vector<std::vector<unsigned char>>>>(
new UdpReporter(
"MetricReporter",
[](pplx::cancellation_token token) { return NodeManagerConfig::ResolveMetricUri(token); },
0,
this->MetricReportInterval,
[this]() { return this->monitor.GetMonitorPacketData(); },
[this](int _) { NamingClient::InvalidateCache(); }));
this->metricReporter->Start();
}
}
pplx::task<json::value> RemoteExecutor::Metric(std::string&& callbackUri)
{
auto uri = NodeManagerConfig::GetMetricUri();
if (uri != callbackUri)
{
NodeManagerConfig::SaveMetricUri(callbackUri);
// callbackUri is like udp://server:port/api/nodeguid/metricreported
Logger::Info("Start reporting metrics to {0}", callbackUri);
this->StartMetric();
}
return pplx::task_from_result(json::value());
}
pplx::task<json::value> RemoteExecutor::MetricConfig(
MetricCountersConfig&& config,
std::string&& callbackUri)
{
this->Metric(std::move(callbackUri));
this->monitor.ApplyMetricConfig(std::move(config), this->cts.get_token());
return pplx::task_from_result(json::value());
}
const ProcessStatistics* RemoteExecutor::TerminateTask(
int jobId, int taskId, int requeueCount,
uint64_t processKey, int exitCode, bool forced, bool mpiDockerTask)
{
if (mpiDockerTask)
{
std::string output;
int ret = System::ExecuteCommandOut(output, "2>&1 /bin/bash", "StopMpiContainer.sh", taskId);
if (ret == 0)
{
Logger::Info(jobId, taskId, requeueCount, "Stop MPI container successfully.");
}
else
{
Logger::Error(jobId, taskId, requeueCount, "Stop MPI container failed with exitcode {0}. {1}", ret, output);
}
return nullptr;
}
auto p = this->processes.find(processKey);
// Logger::Debug(
// jobId, taskId, requeueCount,
// "TerminateTask for ProcessKey {0}, processes count {1}",
// processKey, this->processes.size());
//
// for (auto pro : this->processes)
// {
// Logger::Debug(
// jobId, taskId, requeueCount,
// "TerminateTask process list {0}",
// pro.first);
// }
if (p != this->processes.end())
{
Logger::Debug(jobId, taskId, requeueCount, "About to Kill the task, forced {0}.", forced);
p->second->Kill(exitCode, forced);
const auto* stat = &p->second->GetStatisticsFromCGroup();
int times = 10;
while (!stat->IsTerminated() && times-- > 0)
{
sleep(0.1);
stat = &p->second->GetStatisticsFromCGroup();
}
if (!stat->IsTerminated())
{
Logger::Warn(jobId, taskId, requeueCount,
"The task didn't exit within 1s, process Ids {0}",
String::Join<' '>(stat->ProcessIds));
}
return stat;
}
else
{
Logger::Warn(jobId, taskId, requeueCount, "No process object found.");
return nullptr;
}
}
void RemoteExecutor::ResyncAndInvalidateCache()
{
this->jobTaskTable.RequestResync();
NamingClient::InvalidateCache();
}
pplx::task<json::value> RemoteExecutor::PeekTaskOutput(hpc::arguments::PeekTaskOutputArgs&& args)
{
Logger::Info(args.JobId, args.TaskId, this->UnknowId, "Peeking task output.");
std::string output;
try
{
auto taskInfo = this->jobTaskTable.GetTask(args.JobId, args.TaskId);
if (taskInfo)
{
Logger::Debug(args.JobId, args.TaskId, taskInfo->GetTaskRequeueCount(),
"PeekTaskOutput for ProcessKey {0}, processes count {1}",
taskInfo->ProcessKey, this->processes.size());
auto p = this->processes.find(taskInfo->ProcessKey);
if (p != this->processes.end())
{
output = p->second->PeekOutput();
}
}
}
catch (const std::exception& ex)
{
Logger::Warn(args.JobId, args.TaskId, this->UnknowId, "Exception when peeking task output: {0}", ex.what());
output = "NodeManager: Failed to get the output.";
}
return pplx::task_from_result(json::value::string(output));
}