nodemanager/utils/System.cpp (601 lines of code) (raw):
#include <string>
#include <sys/types.h>
#include <ifaddrs.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <iostream>
#include <sstream>
#include <fstream>
#include <unistd.h>
#include <set>
#include "System.h"
#include "String.h"
#include "Logger.h"
#include "../common/ErrorCodes.h"
using namespace hpc::utils;
using namespace hpc::common;
std::vector<System::NetInfo> System::GetNetworkInfo()
{
std::vector<System::NetInfo> info;
std::string rawData;
int ret = System::ExecuteCommandOut(rawData, "ip", "addr");
if (ret == 0)
{
std::string name, mac, ipV4, ipV6;
bool isIB = false;
std::istringstream iss(rawData);
std::string line;
while (getline(iss, line))
{
std::istringstream lineStream(line);
if (line.empty()) continue;
if (line[0] >= '0' && line[0] <= '9')
{
if (!name.empty())
{
info.push_back(System::NetInfo(name, mac, ipV4, ipV6, isIB));
name.clear();
mac.clear();
ipV4.clear();
ipV6.clear();
isIB = false;
}
lineStream >> name >> name;
if (name.length() > 0)
{
name = name.substr(0, name.length() - 1);
}
// temporarily identify IB
if (name == "ib0")
{
isIB = true;
}
continue;
}
std::string head, value;
lineStream >> head >> value;
if (head.compare(0, 4, "link") == 0)
{
mac = value;
}
else if (head == "inet")
{
ipV4 = value;
}
else if (head == "inet6")
{
ipV6 = value;
}
}
if (!name.empty())
{
info.push_back(System::NetInfo(name, mac, ipV4, ipV6, isIB));
}
}
return std::move(info);
}
std::vector<std::string> System::GetIbDevices()
{
std::map<std::string, uint64_t> networkUsage;
System::IbNetworkUsage(networkUsage, true);
std::vector<std::string> devices;
for (auto const& element : networkUsage)
{
devices.push_back(element.first);
}
return std::move(devices);
}
std::string System::GetIpAddress(IpAddressVersion version, const std::string& name)
{
ifaddrs *ifAddr = nullptr;
bool isV4 = version == IpAddressVersion::V4;
const int Family = isV4 ? AF_INET : AF_INET6;
getifaddrs(&ifAddr);
std::string ip;
for (ifaddrs *i = ifAddr; i != nullptr; i = i->ifa_next)
{
if (!i->ifa_addr) continue;
if (i->ifa_addr->sa_family == Family)
{
void* tmp = isV4 ?
(void*)&((sockaddr_in*)i->ifa_addr)->sin_addr :
(void*)&((sockaddr_in6*)i->ifa_addr)->sin6_addr;
if ((name.empty() && std::string(i->ifa_name) == "lo:") ||
std::string(i->ifa_name) != name)
{
continue;
}
const int BufferLength = isV4 ? INET_ADDRSTRLEN : INET6_ADDRSTRLEN;
char buffer[BufferLength];
inet_ntop(Family, tmp, buffer, BufferLength);
ip = buffer;
// Logger::Debug("Found IPAddress {0}, name {1}", ip, name);
}
}
if (ifAddr != nullptr) freeifaddrs(ifAddr);
return std::move(ip);
}
void System::CPUUsage(uint64_t &total, uint64_t &idle)
{
try
{
std::ifstream fs("/proc/stat", std::ios::in);
std::string cpu;
uint64_t user, nice, sys, iowait, irq, softirq;
fs >> cpu >> user >> nice >> sys >> idle >> iowait >> irq >> softirq;
fs.close();
total = user + nice + sys + idle + iowait + irq + softirq;
}
catch (std::exception& ex)
{
Logger::Error("CPUUsage get exception {0}", ex.what());
}
}
void System::Memory(uint64_t &availableKb, uint64_t &totalKb)
{
std::ifstream fs("/proc/meminfo", std::ios::in);
std::string name, unit;
fs >> name >> totalKb >> unit;
fs >> name >> availableKb >> unit;
fs.close();
}
void System::CPU(int &cores, int &sockets)
{
std::ifstream fs("/proc/cpuinfo", std::ios::in);
std::string name, unit;
std::set<std::string> physicalIds;
std::set<std::string> coreIds;
std::string line;
while (getline(fs, line))
{
auto tokens = String::Split(line, ':');
if (tokens.size() >= 2)
{
if (String::Trim(tokens[0]) == "physical id")
{
physicalIds.insert(tokens[1]);
}
else if (String::Trim(tokens[0]) == "processor")
{
coreIds.insert(tokens[1]);
}
}
}
cores = coreIds.size();
sockets = physicalIds.size();
sockets = (sockets > 0)? sockets : 1;
// Logger::Debug("Detected core count {0}, socket count {1}", cores, sockets);
fs.close();
}
int System::Vmstat(float &pagesPerSec, float &contextSwitchesPerSec)
{
std::string output;
int ret = System::ExecuteCommandOut(output, "vmstat");
if (ret == 0)
{
std::istringstream iss(output);
iss.ignore(std::numeric_limits<std::streamsize>::max(), '\n');
iss.ignore(std::numeric_limits<std::streamsize>::max(), '\n');
int r, b, swpd, free, buff, cache, si, so, bi, bo, in, cs, us, sy, id, wa, st;
iss >> r >> b >> swpd >> free >> buff >> cache >> si >> so >> bi >> bo
>> in >> cs >> us >> sy >> id >> wa >> st;
pagesPerSec = si + so;
contextSwitchesPerSec = cs;
}
return ret;
}
int System::Iostat(float &bytesPerSecond)
{
std::string output;
int ret = System::ExecuteCommandOut(output, "iostat -dk");
if (ret == 0)
{
std::istringstream iss(output);
std::string device;
while (device != "Device:" && iss.good())
{
iss >> device;
iss.ignore(std::numeric_limits<std::streamsize>::max(), '\n');
}
float tps, read, write;
iss >> device >> tps >> read >> write;
bytesPerSecond = (read + write) * 1024;
}
return ret;
}
int System::IostatX(float &queueLength)
{
std::string output;
int ret = System::ExecuteCommandOut(output, "iostat -x");
if (ret == 0)
{
std::istringstream iss(output);
std::string device;
while (device != "Device:" && iss.good())
{
iss >> device;
iss.ignore(std::numeric_limits<std::streamsize>::max(), '\n');
}
float rrqm, wrqm, r, w, rkb, wkb, avgrq, avgqu;
iss >> device >> rrqm >> wrqm >> r >> w >> rkb >> wkb >> avgrq >> avgqu;
queueLength = avgqu;
}
return ret;
}
int System::FreeSpace(float &freeSpacePercent)
{
std::string output;
int ret = System::ExecuteCommandOut(output, "df -k");
if (ret == 0)
{
std::istringstream iss(output);
iss.ignore(std::numeric_limits<std::streamsize>::max(), '\n');
std::string mountPoint;
while (mountPoint != "/" && iss.good())
{
std::string tmp;
iss >> tmp >> tmp >> tmp >> tmp >> freeSpacePercent >> tmp >> mountPoint;
}
if (mountPoint != "/")
{
ret = 1;
}
else
{
freeSpacePercent = 100 - freeSpacePercent;
}
}
return ret;
}
std::map<std::string, uint64_t> System::GetNetworkUsage()
{
std::map<std::string, uint64_t> networkUsage;
bool collected = false;
std::ifstream fs("/proc/net/dev", std::ios::in);
std::string name;
uint64_t receive, send, tmp;
fs.ignore(std::numeric_limits<std::streamsize>::max(), '\n');
fs.ignore(std::numeric_limits<std::streamsize>::max(), '\n');
while (fs.good())
{
std::getline(fs, name, ':');
name = String::Trim(name);
fs >> receive >> tmp >> tmp >> tmp >> tmp >> tmp >> tmp >> tmp >> send;
networkUsage[name] = receive + send;
collected = true;
fs.ignore(std::numeric_limits<std::streamsize>::max(), '\n');
}
fs.close();
if (!collected)
{
Logger::Error("Error occurred while collecting network usage from /proc/net/dev");
}
System::IbNetworkUsage(networkUsage);
return std::move(networkUsage);
}
void System::IbNetworkUsage(std::map<std::string, uint64_t> & networkUsage, bool logFailure)
{
std::string output;
System::ExecuteCommandOut(output, "ls /sys/class/infiniband 2>/dev/null; :");
auto devices = String::Split(String::Trim(output), '\n');
for (const auto & device : devices)
{
std::vector<std::string> counters {"port_rcv_data", "port_xmit_data"};
int factor = 4;
uint64_t usage = 0;
bool succeed = true;
for (const auto & counter : counters)
{
std::string path = String::Join("", "/sys/class/infiniband/", device, "/ports/1/counters/", counter);
std::string output;
System::ExecuteCommandOut(output, String::Join(" ", "head -n1", path, "2>/dev/null; :"));
if (!output.empty())
{
uint64_t value;
std::istringstream iss(output);
iss >> value;
usage += value * factor;
}
else
{
succeed = false;
if (logFailure)
{
Logger::Warn("Failed to get IB network usage from {0}", path);
}
}
}
if (succeed)
{
networkUsage[device] = usage;
}
}
}
const std::string& System::GetNodeName()
{
static std::string nodeName;
if (nodeName.empty())
{
char buffer[256];
if (-1 == gethostname(buffer, 255))
{
Logger::Error("gethostname failed with errno {0}", errno);
exit((int)ErrorCodes::GetHostNameError);
}
nodeName = buffer;
std::transform(
nodeName.begin(),
nodeName.end(),
nodeName.begin(),
::toupper);
nodeName = String::Split(nodeName, '.')[0];
}
return nodeName;
}
const std::string& System::GetDistroInfo()
{
static std::string distroInfo;
if (distroInfo.empty())
{
int ret = System::ExecuteCommandOut(distroInfo, "cat", "/proc/version");
if (ret != 0)
{
Logger::Error("cat /proc/version error code {0} {1}", ret, distroInfo);
}
}
return distroInfo;
}
int System::QueryGpuInfo(System::GpuInfoList& gpuInfo)
{
std::string gpuInfoString;
int ret = System::ExecuteCommandOut(
gpuInfoString,
"nvidia-smi",
"--format=csv,noheader",
"--query-gpu=name,uuid,pci.bus_id,pci.device_id,memory.total,clocks.max.sm,fan.speed,memory.used,power.draw,clocks.current.sm,temperature.gpu,utilization.gpu");
if (127 == ret)
{
Logger::Warn("No nvidia-smi command found.");
return ret;
}
std::vector<std::string> gpuStrings = String::Split(gpuInfoString, '\n');
gpuInfo.GpuInfos.clear();
for (auto s : gpuStrings)
{
auto values = String::Split(s, ',');
System::GpuInfo info;
info.Name = String::Trim(values[0]);
info.Uuid = String::Trim(values[1]);
info.PciBusId = String::Trim(values[2]);
info.DeviceId = String::Trim(values[3]);
info.TotalMemoryMB = String::ConvertTo<float>(values[4]);
info.MaxSMClock = String::ConvertTo<float>(values[5]);
info.FanPercentage = String::ConvertTo<float>(values[6]);
info.UsedMemoryMB = String::ConvertTo<float>(values[7]);
info.PowerWatt = String::ConvertTo<float>(values[8]);
info.CurrentSMClock = String::ConvertTo<float>(values[9]);
info.Temperature = String::ConvertTo<float>(values[10]);
info.GpuUtilization = String::ConvertTo<float>(values[11]);
gpuInfo.GpuInfos.push_back(info);
}
return ret;
}
int System::CreateUser(
const std::string& userName,
const std::string& password,
bool isAdmin)
{
std::string output;
int ret = System::ExecuteCommandOut(output, "useradd", userName, "-m", "-s /bin/bash");
if (ret == 0)
{
std::string input = String::Join("", password, "\n", password, "\n");
ret = System::ExecuteCommandIn(input, "passwd", userName);
if (ret != 0)
{
Logger::Error("passwd {0} error code {1}", userName, ret);
return ret;
}
if (isAdmin)
{
ret = System::ExecuteCommandOut(output, "usermod", "-aG sudo", userName); // add user to group sudo on Ubuntu
if (ret == 6) // group sudo does not exist
{
ret = System::ExecuteCommandOut(output, "usermod", "-aG wheel", userName); // add user to group wheel on CentOS/Redhat/Suse
}
if (ret != 0)
{
Logger::Error("Add user {0} to group sudo/wheel failed. Command usermod, error code {1}", userName, ret);
}
}
}
else
{
Logger::Warn("useradd {0} -m error code {1}", userName, ret);
}
return ret;
}
int System::GetHomeDir(const std::string& userName, std::string& homeDir)
{
std::string folder = String::Join("", "~", userName);
int ret = System::ExecuteCommandOut(
homeDir,
"echo -n",
folder);
if (0 != ret || homeDir.find('~') != homeDir.npos)
{
Logger::Error("Cannot find home folder for user {0}", userName);
return ret == 0 ? (int)ErrorCodes::CannotFindHomeDir : ret;
}
return ret;
}
int System::AddSshKey(
const std::string& userName,
const std::string& key,
const std::string& fileName,
const std::string& filePermission,
std::string& filePath)
{
std::string output;
int ret = System::GetHomeDir(userName, output);
if (0 != ret) { return ret; }
std::string homeDir = output;
std::string sshFolder = String::Join("", homeDir, "/.ssh/");
Logger::Debug("User {0}'s ssh folder {1}", userName, sshFolder);
ret = System::ExecuteCommandOut(
output,
"[ -d ", homeDir, " ] || (mkdir -p ", homeDir, " && chown ", userName, " ", homeDir, ")",
" && [ -d ", sshFolder, " ] || mkdir ", sshFolder,
" && chown ", userName, " ", sshFolder, " && chmod 700 ", sshFolder);
if (ret != 0)
{
Logger::Info("Cannot create folder {0}, error code {1}", sshFolder, ret);
return ret;
}
filePath = String::Join("", sshFolder, fileName);
if (!key.empty())
{
std::ifstream test(filePath);
// won't overwrite existing user's private key
if (!test.good())
{
std::ofstream keyFile(filePath, std::ios::trunc);
keyFile << key;
keyFile.close();
ret = System::ExecuteCommandOut(output, "chown", userName, filePath, "&& chmod", filePermission, filePath);
if (0 != ret)
{
Logger::Error("Error when change the file {0}'s permission to {1}, ret {2}", filePath, filePermission, ret);
}
}
else
{
Logger::Info("File {0} exist, skip overwriting", filePath);
ret = -2;
}
test.close();
return ret;
}
return -1;
}
int System::RemoveSshKey(
const std::string& userName,
const std::string& fileName)
{
std::string output;
int ret = System::GetHomeDir(userName, output);
if (0 != ret) { return ret; }
std::string sshFolder = String::Join("", output, "/.ssh/");
auto keyFileName = String::Join("", sshFolder, fileName);
std::ifstream test(keyFileName);
// won't overwrite existing user's private key
if (test.good())
{
std::string output;
ret = System::ExecuteCommandOut(output, "rm -f", keyFileName);
}
test.close();
return ret;
}
int System::AddAuthorizedKey(
const std::string& userName,
const std::string& key,
const std::string& filePermission,
std::string& filePath)
{
std::string output;
int ret = System::GetHomeDir(userName, output);
if (0 != ret) { return ret; }
std::string sshFolder = String::Join("", output, "/.ssh/");
filePath = String::Join("", sshFolder, "authorized_keys");
std::ofstream authFile(filePath, std::ios::app);
ret = (int)ErrorCodes::WriteFileError;
if (authFile.good())
{
authFile << key;
ret = 0;
}
authFile.close();
if (0 != ret)
{
Logger::Error("Error when open the auth file {0}", filePath);
return ret;
}
ret = System::ExecuteCommandOut(output, "chown", userName, filePath, "&& chmod", filePermission, filePath);
if (0 != ret)
{
Logger::Error("Error when change the file {0}'s permission to {1}, ret {2}", filePath, filePermission, ret);
}
return ret;
}
int System::RemoveAuthorizedKey(
const std::string& userName,
const std::string& key)
{
std::string output;
int ret = System::GetHomeDir(userName, output);
if (0 != ret) { return ret; }
std::string sshFolder = String::Join("", output, "/.ssh/");
std::string authFileName = String::Join("", sshFolder, "authorized_keys");
std::ifstream authFile(authFileName, std::ios::in);
std::vector<std::string> lines;
ret = (int)ErrorCodes::WriteFileError;
if (!authFile.good())
{
authFile.close();
return ret;
}
std::string trimKey = String::Trim(key);
std::string line;
while (getline(authFile, line))
{
if (String::Trim(line) != trimKey)
{
lines.push_back(line);
}
}
authFile.close();
std::ofstream authFileOut(authFileName, std::ios::trunc);
for (auto& k : lines)
{
authFileOut << k << std::endl;
}
ret = 0;
authFileOut.close();
return ret;
}
int System::DeleteUser(const std::string& userName)
{
std::string output;
// kill all processes associated with this user.
int ret = System::ExecuteCommandOut(output, "pkill -KILL -U `id -ur", userName, "`;",
"userdel", userName, "-r -f");
if (ret != 0)
{
Logger::Error("userdel {0} error code {1}", userName, ret);
}
return ret;
}
int System::CreateTempFolder(char* folderTemplate, const std::string& userName)
{
char* p = mkdtemp(folderTemplate);
if (p)
{
std::string output;
int ret = System::ExecuteCommandOut(output, "chown -R", userName, p);
if (ret == 0)
{
ret = System::ExecuteCommandOut(output, "chmod -R 700", p);
}
return ret;
}
else
{
return errno;
}
}
int System::WriteStringToFile(const std::string& fileName, const std::string& contents)
{
std::ofstream os(fileName, std::ios::trunc);
if (!os)
{
return (int)ErrorCodes::WriteFileError;
}
os << contents;
os.close();
return 0;
}