nodemanager/core/Monitor.cpp (598 lines of code) (raw):

#include <pthread.h> #include <boost/range/algorithm.hpp> #include <boost/range/adaptors.hpp> #include <boost/phoenix.hpp> #include "Monitor.h" #include "../utils/ReaderLock.h" #include "../utils/WriterLock.h" #include "../utils/Logger.h" #include "../utils/System.h" #include "JobTaskTable.h" #include "NodeManagerConfig.h" #include "../Version.h" using namespace hpc::core; using namespace hpc::utils; using namespace hpc::data; using namespace hpc::arguments; using namespace boost::phoenix::arg_names; Monitor::Monitor(const std::string& nodeName, const std::string& netName, int interval) : name(nodeName), networkName(netName), lock(PTHREAD_RWLOCK_INITIALIZER), intervalSeconds(interval), isCollected(false) { InitializeGpuDriver(); this->collectors["\\Processor\\% Processor Time"] = std::make_shared<MetricCollectorBase>([this] (const std::string& instanceName) { if (instanceName == "_Total") { return this->cpuUsage; } else { Logger::Warn("Unable to collect {0} for \\Processor\\% Processor Time", instanceName); return 0.0f; } }); this->collectors["\\Memory\\Pages/sec"] = std::make_shared<MetricCollectorBase>([this] (const std::string& instanceName) { return this->pagesPerSec; }); this->collectors["\\Memory\\Available MBytes"] = std::make_shared<MetricCollectorBase>([this] (const std::string& instanceName) { return this->availableMemoryMb; }); this->collectors["\\System\\Context switches/sec"] = std::make_shared<MetricCollectorBase>([this] (const std::string& instanceName) { return this->contextSwitchesPerSec; }); this->collectors["\\System\\System Calls/sec"] = std::make_shared<MetricCollectorBase>([this] (const std::string& instanceName) { if (NodeManagerConfig::GetDebug()) { Logger::Warn("Unable to collect {0} for \\System\\System Calls/sec", instanceName); } return 0.0f; }); this->collectors["\\PhysicalDisk\\Disk Bytes/sec"] = std::make_shared<MetricCollectorBase>([this] (const std::string& instanceName) { if (instanceName != "_Total") { Logger::Warn("Unable to collect {0} for \\PhysicalDisk\\Disk Bytes/sec", instanceName); } return this->bytesPerSecond; }); this->collectors["\\LogicalDisk\\Avg. Disk Queue Length"] = std::make_shared<MetricCollectorBase>([this] (const std::string& instanceName) { if (instanceName != "_Total") { Logger::Warn("Unable to collect {0} for \\LogicalDisk\\Avg. Disk Queue Length", instanceName); } return this->queueLength; }); this->collectors["\\Node Manager\\Number of Cores in use"] = std::make_shared<MetricCollectorBase>([this] (const std::string& instanceName) { float coresInUse = 0.0f; auto* table = JobTaskTable::GetInstance(); if (table != nullptr) { coresInUse = table->GetCoresInUse(); } return coresInUse; }); this->collectors["\\Node Manager\\Number of Running Jobs"] = std::make_shared<MetricCollectorBase>([this] (const std::string& instanceName) { float runningJobs = 0.0f; auto* table = JobTaskTable::GetInstance(); if (table != nullptr) { runningJobs = table->GetJobCount(); } return runningJobs; }); this->collectors["\\Node Manager\\Number of Running Tasks"] = std::make_shared<MetricCollectorBase>([this] (const std::string& instanceName) { float runningTasks = 0.0f; auto* table = JobTaskTable::GetInstance(); if (table != nullptr) { runningTasks = table->GetTaskCount(); } return runningTasks; }); this->collectors["\\LogicalDisk\\% Free Space"] = std::make_shared<MetricCollectorBase>([this] (const std::string& instanceName) { if (instanceName == "_Total" || instanceName.empty()) { return this->freeSpacePercent; } else { Logger::Warn("Unable to collect {0} for \\LogicalDisk\\% Free Space", instanceName); return 0.0f; } }); this->collectors["\\Network Interface\\Bytes Total/sec"] = std::make_shared<MetricCollectorBase>([this] (const std::string& instanceName) { if (instanceName == "_Total" || instanceName.empty()) { float total = 0; for (const auto & pair : this->networkUsage) { total += (float)pair.second; } return total; } else if (this->networkUsage.find(instanceName) != this->networkUsage.end()) { return (float)this->networkUsage[instanceName]; } else { // handle network interface names with format "<link name>@<peer interface index>", like "eth0@if2" auto pos = instanceName.find('@'); if (pos != std::string::npos) { auto subStr = instanceName.substr(0, pos); if (this->networkUsage.find(subStr) != this->networkUsage.end()) { return (float)this->networkUsage[subStr]; } } Logger::Warn("Unable to collect {0} for \\Network Interface\\Bytes Total/sec", instanceName); return 0.0f; } }, [](const std::string& instanceFilter) { auto instanceNames = System::GetIbDevices(); for (const auto & netInfo : System::GetNetworkInfo()) { instanceNames.push_back(std::get<0>(netInfo)); } return GetFilteredInstanceNames(instanceNames, instanceFilter); }); if (this->gpuInitRet == 0) { auto gpuInstanceNamesFunc = [this](const std::string& instanceFilter) { auto instanceNames = this->gpuInfo.GetGpuInstanceNames(); return GetFilteredInstanceNames(instanceNames, instanceFilter); }; this->collectors["\\GPU\\GPU Time (%)"] = std::make_shared<MetricCollectorBase>([this] (const std::string& instanceName) { if (instanceName == "_Total" || instanceName.empty()) { return this->gpuInfo.GetGpuUtilization(); } else { auto index = String::ConvertTo<size_t>(instanceName); if (index >= 0 && index < this->gpuInfo.GpuInfos.size()) { float v = this->gpuInfo.GpuInfos[index].GpuUtilization; //Logger::Debug("\\GPU\\GPU Time (%), for index {0} is {1}", index, v); return v; } else { Logger::Warn("Collect \\GPU\\GPU Time (%) for instance {0}, index {1}, invalid index", instanceName, index); return 0.0f; } } }, gpuInstanceNamesFunc); this->collectors["\\GPU\\GPU Fan Speed (%)"] = std::make_shared<MetricCollectorBase>([this] (const std::string& instanceName) { if (instanceName == "_Total" || instanceName.empty()) { return this->gpuInfo.GetFanPercentage(); } else { auto index = String::ConvertTo<size_t>(instanceName); if (index >= 0 && index < this->gpuInfo.GpuInfos.size()) { return this->gpuInfo.GpuInfos[index].FanPercentage; } else { Logger::Warn("Collect \\GPU\\GPU Fan Speed (%) for instance {0}, index {1}, invalid index", instanceName, index); return 0.0f; } } }, gpuInstanceNamesFunc); this->collectors["\\GPU\\GPU Memory Usage (%)"] = std::make_shared<MetricCollectorBase>([this] (const std::string& instanceName) { if (instanceName == "_Total" || instanceName.empty()) { return this->gpuInfo.GetUsedMemoryPercentage(); } else { auto index = String::ConvertTo<size_t>(instanceName); if (index >= 0 && index < this->gpuInfo.GpuInfos.size()) { return this->gpuInfo.GpuInfos[index].GetUsedMemoryPercentage(); } else { Logger::Warn("Collect \\GPU\\GPU Memory Usage (%) for instance {0}, index {1}, invalid index", instanceName, index); return 0.0f; } } }, gpuInstanceNamesFunc); this->collectors["\\GPU\\GPU Memory Used (MB)"] = std::make_shared<MetricCollectorBase>([this] (const std::string& instanceName) { if (instanceName == "_Total" || instanceName.empty()) { // Get GPU Time; return this->gpuInfo.GetUsedMemoryMB(); } else { auto index = String::ConvertTo<size_t>(instanceName); if (index >= 0 && index < this->gpuInfo.GpuInfos.size()) { return this->gpuInfo.GpuInfos[index].UsedMemoryMB; } else { Logger::Warn("Collect \\GPU\\GPU Memory Used (MB) for instance {0}, index {1}, invalid index", instanceName, index); return 0.0f; } } }, gpuInstanceNamesFunc); this->collectors["\\GPU\\GPU Power Usage (Watts)"] = std::make_shared<MetricCollectorBase>([this] (const std::string& instanceName) { if (instanceName == "_Total" || instanceName.empty()) { return this->gpuInfo.GetPowerWatt(); } else { auto index = String::ConvertTo<size_t>(instanceName); if (index >= 0 && index < this->gpuInfo.GpuInfos.size()) { return this->gpuInfo.GpuInfos[index].PowerWatt; } else { Logger::Warn("Collect \\GPU\\GPU Power Usage (Watts) for instance {0}, index {1}, invalid index", instanceName, index); return 0.0f; } } }, gpuInstanceNamesFunc); this->collectors["\\GPU\\GPU SM Clock (MHz)"] = std::make_shared<MetricCollectorBase>([this] (const std::string& instanceName) { if (instanceName == "_Total" || instanceName.empty()) { // Get GPU Time; return this->gpuInfo.GetCurrentSMClock(); } else { auto index = String::ConvertTo<size_t>(instanceName); if (index >= 0 && index < this->gpuInfo.GpuInfos.size()) { return this->gpuInfo.GpuInfos[index].CurrentSMClock; } else { Logger::Warn("Collect \\GPU\\GPU SM Clock (MHz) for instance {0}, index {1}, invalid index", instanceName, index); return 0.0f; } } }, gpuInstanceNamesFunc); this->collectors["\\GPU\\GPU Temperature (degrees C)"] = std::make_shared<MetricCollectorBase>([this] (const std::string& instanceName) { if (instanceName == "_Total" || instanceName.empty()) { // Get GPU Time; return this->gpuInfo.GetTemperature(); } else { auto index = String::ConvertTo<size_t>(instanceName); if (index >= 0 && index < this->gpuInfo.GpuInfos.size()) { return this->gpuInfo.GpuInfos[index].Temperature; } else { Logger::Warn("Collect \\GPU\\GPU Temperature for instance {0}, index {1}, invalid index", instanceName, index); return 0.0f; } } }, gpuInstanceNamesFunc); } InitializeMetadataRequester(); int result = pthread_create(&this->threadId, nullptr, MonitoringThread, this); if (result != 0) Logger::Error("Create monitoring thread result {0}, errno {1}", result, errno); } Monitor::~Monitor() { if (this->threadId != 0) { // todo: graceful exit the thread. pthread_cancel(this->threadId); pthread_join(this->threadId, nullptr); } pthread_rwlock_destroy(&this->lock); } void Monitor::SetNodeUuid(const uuid& id) { this->packet.Uuid.AssignFrom(id); } void Monitor::ApplyMetricConfig(MetricCountersConfig&& config, pplx::cancellation_token token) { WriterLock writerLock(&this->lock); for_each(this->collectors.begin(), this->collectors.end(), [] (auto& kvp) { kvp.second->Reset(); }); for (auto& counter : config.MetricCounters) { if (!this->EnableMetricCounter(counter, token)) { Logger::Debug("Disabled counter MetricId {0}, InstanceId {1}, InstanceName {2} Path {3}", counter.MetricId, counter.InstanceId, counter.InstanceName, counter.Path); } else { Logger::Debug("Enabled counter MetricId {0}, InstanceId {1}, InstanceName {2} Path {3}", counter.MetricId, counter.InstanceId, counter.InstanceName, counter.Path); } } } bool Monitor::EnableMetricCounter(const MetricCounter& counterConfig, pplx::cancellation_token token) { auto collector = this->collectors.find(counterConfig.Path); if (collector != this->collectors.end()) { collector->second->ApplyConfig(counterConfig, token); return true; } return false; } std::vector<std::vector<unsigned char>> Monitor::GetMonitorPacketData() { const size_t MaxPacketSize = 1024; std::vector<std::vector<unsigned char>> packets; ReaderLock readerLock(&this->lock); if (this->isCollected) { // this->packet.Count = std::count_if(this->collectors.begin(), this->collectors.end(), [] (auto& kvp) { return kvp.second->IsEnabled(); }); this->packet.TickCount = this->intervalSeconds; this->packet.ClearData(); int p = 0; if (NodeManagerConfig::GetDebug()) { Logger::Debug("Start get package data"); } for (auto& c : this->collectors) { if (c.second->IsEnabled()) { auto values = c.second->CollectValues(); for (auto& v : values) { if (NodeManagerConfig::GetDebug()) { Logger::Debug("Report p={0}, value={1}, metricId={2}, instanceId={3}", p, v.first, v.second.MetricId, v.second.InstanceId); } this->packet.Umids[p] = v.second; this->packet.Values[p] = v.first; p ++; if (p >= MaxCountersInPacket) { this->packet.Count = p; packets.push_back(this->packet.ToByteArray(MaxPacketSize)); this->packet.ClearData(); p = 0; } } } } if (p > 0) { this->packet.Count = p; packets.push_back(this->packet.ToByteArray(MaxPacketSize)); } } return std::move(packets); } json::value Monitor::GetRegisterInfo() { ReaderLock lock(&this->lock); if (!this->isCollected) { return json::value::null(); } json::value j; j["NodeName"] = json::value::string(this->name); j["Time"] = json::value::string(this->metricTime); j["IpAddress"] = json::value::string(this->ipAddress); j["CoreCount"] = this->coreCount; j["SocketCount"] = this->socketCount; j["MemoryMegabytes"] = this->totalMemoryMb; j["DistroInfo"] = json::value::string(this->distroInfo); std::vector<json::value> networkValues; for (const auto& info : this->networkInfo) { json::value v; v["Name"] = json::value::string(std::get<0>(info)); v["MacAddress"] = json::value::string(std::get<1>(info)); v["IpV4"] = json::value::string(std::get<2>(info)); v["IpV6"] = json::value::string(std::get<3>(info)); v["IsIB"] = std::get<4>(info); networkValues.push_back(v); } j["NetworksInfo"] = json::value::array(networkValues); std::vector<json::value> gpuValues; for (const auto& info : this->gpuInfo.GpuInfos) { json::value v; v["Name"] = json::value::string(info.Name); v["Uuid"] = json::value::string(info.Uuid); v["PciBusDevice"] = json::value::string(info.GetPciBusDevice()); v["PciBusId"] = json::value::string(info.PciBusId); v["TotalMemory"] = info.TotalMemoryMB; v["MaxSMClock"] = info.MaxSMClock; gpuValues.push_back(v); } j["GpuInfo"] = json::value::array(gpuValues); if (!this->azureInstanceMetadata.empty()) { j["AzureInstanceMetadata"] = json::value::string(this->azureInstanceMetadata); } j["CcpVersion"] = json::value::string(Version::GetVersion()); j["CustomProperties"] = json::value::string(""); return std::move(j); } void Monitor::Run() { uint64_t cpuLast = 0, idleLast = 0; std::map<std::string, uint64_t> networkLast; int collectCount = 0; while (true) { time_t t; time(&t); uint64_t cpuCurrent = cpuLast + 1, idleCurrent = idleLast; System::CPUUsage(cpuCurrent, idleCurrent); uint64_t totalDiff = cpuCurrent - cpuLast; uint64_t idleDiff = idleCurrent - idleLast; float cpuUsage = (float)(100.0f * (totalDiff - idleDiff) / totalDiff); cpuLast = cpuCurrent; idleLast = idleCurrent; uint64_t available, total; System::Memory(available, total); float availableMemoryMb = (float)available / 1024.0f; float totalMemoryMb = (float)total / 1024.0f; float freeSpacePercent = 0.0f, queueLength = 0.0f, pagesPerSec = 0.0f, contextSwitchesPerSec = 0.0f, bytesPerSecond = 0.0f; System::FreeSpace(freeSpacePercent); System::IostatX(queueLength); System::Vmstat(pagesPerSec, contextSwitchesPerSec); System::Iostat(bytesPerSecond); // network usage auto networkUsage = System::GetNetworkUsage(); for (const auto & pair : networkUsage) { auto networkName = pair.first; auto networkCurrent = pair.second; if (networkLast.find(networkName) == networkLast.end()) { networkLast[networkName] = 0; } networkUsage[networkName] = (networkCurrent - networkLast[networkName]) / this->intervalSeconds; networkLast[networkName] = networkCurrent; } // ip address; std::string ipAddress = System::GetIpAddress(IpAddressVersion::V4, this->networkName); // cpu type; int cores, sockets; System::CPU(cores, sockets); // distro; const std::string& distro = System::GetDistroInfo(); // networks; auto netInfo = System::GetNetworkInfo(); // GPU System::GpuInfoList gpuInfo; if (this->gpuInitRet == 0) { this->gpuInitRet = System::QueryGpuInfo(gpuInfo); } auto queryMetadata = collectCount % 30 == 0; std::string metaData = ""; if (queryMetadata) { metaData = this->QueryAzureInstanceMetadata(); } { WriterLock writerLock(&this->lock); this->metricTime = ctime(&t); this->cpuUsage = cpuUsage; this->availableMemoryMb = availableMemoryMb; this->networkUsage = std::move(networkUsage); this->totalMemoryMb = totalMemoryMb; this->ipAddress = ipAddress; this->coreCount = cores; this->socketCount = sockets; this->distroInfo = distro; this->networkInfo = std::move(netInfo); this->freeSpacePercent = freeSpacePercent; this->queueLength = queueLength; this->pagesPerSec = pagesPerSec; this->contextSwitchesPerSec = contextSwitchesPerSec; this->bytesPerSecond = bytesPerSecond; if (this->gpuInitRet == 0) { if (NodeManagerConfig::GetDebug()) { Logger::Debug("Saving Gpu Info ret {0}, info count {1}", this->gpuInitRet, gpuInfo.GpuInfos.size()); } this->gpuInfo = std::move(gpuInfo); } if (queryMetadata) { this->azureInstanceMetadata = metaData; } } this->isCollected = true; sleep(this->intervalSeconds); collectCount++; } } void Monitor::InitializeGpuDriver() { Logger::Info("Check nvidia-smi and enable persistence mode for GPU."); std::string output; this->gpuInitRet = System::ExecuteCommandOut(output, "nvidia-smi -pm 1 2>/dev/null"); if (this->gpuInitRet != 0) { Logger::Warn("GPU metrics will not be collected."); } } void Monitor::InitializeMetadataRequester() { web::http::client::http_client_config config; utility::seconds timeout(1l); config.set_timeout(timeout); auto metaDataUri = NodeManagerConfig::GetAzureInstanceMetaDataUri(); if (metaDataUri.empty()) { metaDataUri = "http://169.254.169.254/metadata/instance?api-version=2017-08-01"; } this->metaDataClient = std::make_shared<web::http::client::http_client>(metaDataUri, config); this->metaDataRequest = std::make_shared<web::http::http_request>(web::http::methods::GET); this->metaDataRequest->headers().add("metadata", "true"); } std::string Monitor::QueryAzureInstanceMetadata() { std::string azureInstanceMetadata = ""; if (this->remainingRetryCount > 0) { try { auto response = this->metaDataClient->request(*this->metaDataRequest).get(); if (response.status_code() == web::http::status_codes::OK) { azureInstanceMetadata = response.extract_string().get(); Logger::Debug("Get metadata of Azure instance. {0}", azureInstanceMetadata); this->remainingRetryCount = 5; } else { Logger::Warn("Failed to query Azure node metadata. Status code: {0}. Remaining retry count: {1}.", response.status_code(), --this->remainingRetryCount); } } catch (const std::exception& ex) { Logger::Warn("Exception when querying Azure node metadata. {0}. Remaining retry count: {1}.", ex.what(), --this->remainingRetryCount); } } return std::move(azureInstanceMetadata); } void* Monitor::MonitoringThread(void* arg) { pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, nullptr); pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, nullptr); Monitor* m = static_cast<Monitor*>(arg); Logger::Info("Monitoring thread created. Interval {0}", m->intervalSeconds); m->Run(); pthread_exit(nullptr); } std::vector<std::string> Monitor::GetFilteredInstanceNames(const std::vector<std::string> & instanceNames, const std::string & instanceFilter) { if (instanceFilter.empty()) { return std::move(instanceNames); } else { std::vector<std::string> filteredInstanceNames; std::copy_if(instanceNames.begin(), instanceNames.end(), std::back_inserter(filteredInstanceNames), [instanceFilter](std::string s){return String::AsteriskMatch(s, instanceFilter);}); return std::move(filteredInstanceNames); } }