nodemanager/core/Process.h (140 lines of code) (raw):
#ifndef PROCESS_H
#define PROCESS_H
#include <string>
#include <sstream>
#include <vector>
#include <map>
#include <memory>
#include <unistd.h>
#include <sys/signal.h>
#include <pplx/pplxtasks.h>
#include <sys/wait.h>
#include <sys/resource.h>
#include <boost/algorithm/string.hpp>
#include "../utils/String.h"
#include "../utils/Logger.h"
#include "../utils/System.h"
#include "../common/ErrorCodes.h"
#include "../data/ProcessStatistics.h"
using namespace hpc::utils;
namespace hpc
{
namespace core
{
class Process
{
public:
typedef void Callback(
int,
std::string&&,
const hpc::data::ProcessStatistics& stat);
Process(
int jobId,
int taskId,
int requeueCount,
const std::string& taskExecutionName,
const std::string& cmdLine,
const std::string& standardOut,
const std::string& standardErr,
const std::string& standardIn,
const std::string& workDir,
const std::string& user,
bool dumpStdoutToExecutionMessage,
std::vector<uint64_t>&& cpuAffinity,
std::map<std::string, std::string>&& envi,
const std::function<Callback> completed);
Process(Process&&) = default;
virtual ~Process();
pplx::task<std::pair<pid_t, pthread_t>> Start(std::shared_ptr<Process> self);
void Kill(int forcedExitCode = 0x0FFFFFFF, bool forced = true);
const hpc::data::ProcessStatistics& GetStatisticsFromCGroup();
static void Cleanup();
pplx::task<void> OnCompleted();
int GetExitCode() const { return this->exitCode; }
std::string GetExecutionMessage() const { return this->message.str(); }
void SetSelfPtr(std::shared_ptr<Process> self) { this->selfPtr.swap(self); }
void ResetSelfPtr() { this->selfPtr.reset(); }
std::string PeekOutput();
protected:
private:
static bool StartWithHttpOrHttps(const std::string& path)
{
return boost::algorithm::starts_with(path, "http://") || boost::algorithm::starts_with(path, "https://");
}
void SetExitCode(int exitCode)
{
this->exitCode = exitCode;
this->exitCodeSet = true;
}
int CreateTaskFolder();
template <typename ... Args>
int ExecuteCommand(const std::string& cmd, const Args& ... args)
{
std::string output;
std::string cmdLine = String::Join(" ", cmd, args...);
int ret = System::ExecuteCommandOut(output, cmd, args...);
if (ret != 0)
{
this->SetExitCode(ret);
this->message
<< "Task " << this->taskId << ": '" << cmdLine
<< "', exitCode " << ret << ". output "
<< output << std::endl;
}
Logger::Debug(this->jobId, this->taskId, this->requeueCount, "'{0}', exitCode {1}, output {2}.", cmdLine, ret, output);
return ret;
}
template <typename ... Args>
int ExecuteCommandNoCapture(const std::string& cmd, const Args& ... args)
{
std::string output;
std::string cmdLine = String::Join(" ", cmd, args...);
int ret = System::ExecuteCommandOut(output, cmd, args...);
Logger::Debug(this->jobId, this->taskId, this->requeueCount, "'{0}', exitCode {1}, output {2}.", cmdLine, ret, output);
return ret;
}
static void* ForkThread(void*);
std::string GetAffinity();
void Run(const std::string& path);
static void* ReadPipeThread(void* p);
void SendbackOutput(const std::string& uri, const std::string& output, int order) const;
void Monitor();
std::string BuildScript();
std::unique_ptr<const char* []> PrepareEnvironment();
void OnCompletedInternal();
std::ostringstream stdOut;
std::ostringstream stdErr;
std::ostringstream message;
int exitCode = (int)hpc::common::ErrorCodes::DefaultExitCode;
bool exitCodeSet = false;
hpc::data::ProcessStatistics statistics;
std::string taskFolder;
const int jobId;
const int taskId;
const int requeueCount;
const std::string taskExecutionId;
const std::string commandLine;
std::string stdOutFile;
std::string stdErrFile;
const std::string stdInFile;
const std::string workDirectory;
const std::string userName;
bool dumpStdout = false;
const std::vector<uint64_t> affinity;
const std::map<std::string, std::string> environments;
std::vector<std::string> environmentsBuffer;
bool streamOutput = false;
int stdoutPipe[2];
const std::function<Callback> callback;
std::shared_ptr<Process> selfPtr;
pthread_t threadId = 0;
pthread_t outputThreadId = 0;
pid_t processId;
bool ended = false;
pthread_rwlock_t lock = PTHREAD_RWLOCK_INITIALIZER;
pplx::task_completion_event<std::pair<pid_t, pthread_t>> started;
pplx::task_completion_event<void> completed;
};
}
}
#endif // PROCESS_H