components/otelopscol/receiver/nvmlreceiver/client.go (356 lines of code) (raw):
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//go:build gpu
// +build gpu
package nvmlreceiver
import (
"fmt"
"strings"
"sync"
"time"
"github.com/NVIDIA/go-nvml/pkg/nvml"
"github.com/shirou/gopsutil/v3/process"
"go.uber.org/zap"
)
const maxWarningsForFailedDeviceMetricQuery = 5
type nvmlClient struct {
logger *zap.SugaredLogger
disable bool
handleCleanup func() error
devices []nvml.Device
devicesModelName []string
devicesUUID []string
deviceToLastSeenTimestamp map[nvml.Device]uint64
deviceMetricToFailedQueryCount map[string]uint64
deviceToAccountingIsEnabled map[nvml.Device]bool
}
type deviceMetric struct {
time time.Time
gpuIndex uint
name string
value [8]byte
}
type processMetric struct {
time time.Time
gpuIndex uint
processPid int
lifetimeGpuUtilization uint64
lifetimeGpuMaxMemory uint64
processName string
command string
commandLine string
owner string
}
// calling nvml.Init() twice causes an unnecessary error (also wrap here for mocking)
var once sync.Once
var nvmlInitReturn nvml.Return
var nvmlInit = func() nvml.Return {
once.Do(func() {
nvmlInitReturn = nvml.Init()
})
return nvmlInitReturn
}
var nvmlDeviceGetSamples = nvml.DeviceGetSamples
var nvmlDeviceGetMemoryInfo = nvml.DeviceGetMemoryInfo
var nvmlDeviceSetAccountingMode = nvml.DeviceSetAccountingMode
var nvmlDeviceGetAccountingPids = nvml.DeviceGetAccountingPids
func newClient(config *Config, logger *zap.Logger) (*nvmlClient, error) {
nvmlCleanup, err := initializeNvml(logger)
if err != nil {
logger.Sugar().Warnf("Unable to find and/or initialize Nvidia Management Library on '%w'. No Nvidia device metrics will be collected.", err)
return &nvmlClient{logger: logger.Sugar(), disable: true}, nil
}
devices, names, UUIDs, err := discoverDevices(logger)
if err != nil {
return nil, err
}
var deviceToAccountingIsEnabled map[nvml.Device]bool
if config.Metrics.NvmlGpuProcessesUtilization.Enabled || config.Metrics.NvmlGpuProcessesMaxBytesUsed.Enabled {
deviceToAccountingIsEnabled = enableProcessAccountingModeOnSupportingDevices(logger, devices)
}
return &nvmlClient{
logger: logger.Sugar(),
disable: false,
handleCleanup: nvmlCleanup,
devices: devices,
devicesModelName: names,
devicesUUID: UUIDs,
deviceToLastSeenTimestamp: make(map[nvml.Device]uint64),
deviceMetricToFailedQueryCount: make(map[string]uint64),
deviceToAccountingIsEnabled: deviceToAccountingIsEnabled,
}, nil
}
func initializeNvml(logger *zap.Logger) (nvmlCleanup func() error, err error) {
nvmlCleanup = nil
defer func() {
// applicable to tagged releases of github.com/NVIDIA/go-nvml <= v0.11.6-0
if perr := recover(); perr != nil {
err = fmt.Errorf("%v", perr)
}
}()
ret := nvmlInit()
if ret != nvml.SUCCESS {
if ret == nvml.ERROR_LIBRARY_NOT_FOUND {
err = fmt.Errorf("libnvidia-ml.so not found")
} else {
err = fmt.Errorf("'%v'", nvml.ErrorString(ret))
}
return
}
logger.Sugar().Infof("Successfully initialized Nvidia Management Library")
printNvmlAndDriverVersion(logger)
nvmlCleanup = func() error {
ret := nvml.Shutdown()
if ret != nvml.SUCCESS {
msg := fmt.Sprintf("Unable to shutdown Nvidia Management library on '%v'", nvml.ErrorString(ret))
logger.Sugar().Warnf(msg)
return fmt.Errorf("%s", msg)
}
return nil
}
err = nil
return
}
func printNvmlAndDriverVersion(logger *zap.Logger) {
nvmlVersion, ret := nvml.SystemGetNVMLVersion()
if ret != nvml.SUCCESS {
logger.Sugar().Warnf("Unable to determine Nvidia Management library version on '%v'", nvml.ErrorString(ret))
}
logger.Sugar().Infof("Nvidia Management library version is %s", nvmlVersion)
driverVersion, ret := nvml.SystemGetDriverVersion()
if ret != nvml.SUCCESS {
logger.Sugar().Warnf("Unable to determine NVIDIA driver version on '%v'", nvml.ErrorString(ret))
}
logger.Sugar().Infof("NVIDIA driver version is %s", driverVersion)
}
func discoverDevices(logger *zap.Logger) ([]nvml.Device, []string, []string, error) {
count, ret := nvml.DeviceGetCount()
if ret != nvml.SUCCESS {
return nil, nil, nil, fmt.Errorf("Unable to get Nvidia device count on '%v'", nvml.ErrorString(ret))
}
devices := make([]nvml.Device, 0, count)
names := make([]string, 0, count)
UUIDs := make([]string, 0, count)
for i := 0; i < count; i++ {
device, ret := nvml.DeviceGetHandleByIndex(i)
if ret != nvml.SUCCESS {
logger.Sugar().Warnf("Unable to get Nvidia device at index %d on '%v'; ignoring device.", i, nvml.ErrorString(ret))
continue
}
/* Note: UUID and Name query should not fail under normal circumstances */
UUID, ret := device.GetUUID()
if ret != nvml.SUCCESS {
logger.Sugar().Warnf("Unable to get UUID of Nvidia device %d on '%v'; ignoring device.", i, nvml.ErrorString(ret))
continue
}
name, ret := device.GetName()
if ret != nvml.SUCCESS {
logger.Sugar().Warnf("Unable to get name of Nvidia device %d on '%v'; ignoring device.", i, nvml.ErrorString(ret))
continue
}
devices = append(devices, device)
UUIDs = append(UUIDs, UUID)
names = append(names, name)
logger.Sugar().Infof("Discovered Nvidia device %d of model %s with UUID %s.", i, name, UUID)
currMode, _, ret := device.GetMigMode()
if ret != nvml.SUCCESS {
logger.Sugar().Warnf("Unable to query MIG mode for Nvidia device %d.", i)
continue
}
if currMode == nvml.DEVICE_MIG_ENABLE {
logger.Sugar().Warnf("Nvidia device %d has MIG enabled. GPU utilization queries may not be supported.", i)
}
}
if len(devices) == 0 {
return nil, nil, nil, fmt.Errorf("No supported NVIDIA devices found")
}
return devices, names, UUIDs, nil
}
func enableProcessAccountingModeOnSupportingDevices(logger *zap.Logger, devices []nvml.Device) map[nvml.Device]bool {
deviceToAccountingIsEnabled := make(map[nvml.Device]bool, len(devices))
enableCount := 0
for gpuIndex, device := range devices {
ret := nvmlDeviceSetAccountingMode(device, nvml.FEATURE_ENABLED)
if ret != nvml.SUCCESS {
logger.Sugar().Warnf("Unable to set process accounting mode for Nvidia device %d on '%s'.", gpuIndex, nvml.ErrorString(ret))
deviceToAccountingIsEnabled[device] = false
continue
}
logger.Sugar().Infof("Successfully enabled process accounting mode for Nvidia device %d.", gpuIndex)
deviceToAccountingIsEnabled[device] = true
enableCount++
}
if enableCount == 0 {
logger.Sugar().Warnf("Unable to enable process metrics collection on any NVIDIA devices. No Nvidia process metrics will be collected.")
}
return deviceToAccountingIsEnabled
}
func (client *nvmlClient) cleanup() error {
if client.handleCleanup != nil {
err := client.handleCleanup()
if err != nil {
return err
}
}
if !client.disable {
client.logger.Info("Shutdown Nvidia Management Library client")
}
return nil
}
func (client *nvmlClient) getDeviceModelName(gpuIndex uint) string {
return client.devicesModelName[gpuIndex]
}
func (client *nvmlClient) getDeviceUUID(gpuIndex uint) string {
return client.devicesUUID[gpuIndex]
}
func (client *nvmlClient) collectDeviceMetrics() ([]deviceMetric, error) {
// not strictly needed since len(client.devices) = 0; but, safer
if client.disable {
return nil, nil
}
deviceMetrics := client.collectDeviceUtilization()
deviceMetrics = append(deviceMetrics, client.collectDeviceMemoryInfo()...)
return deviceMetrics, nil
}
func (client *nvmlClient) collectDeviceUtilization() []deviceMetric {
deviceMetrics := make([]deviceMetric, 0, len(client.devices))
gpuUtil := deviceMetric{name: "nvml.gpu.utilization"}
for gpuIndex, device := range client.devices {
mean, err := client.getAverageGpuUtilizationSinceLastQuery(device)
if err != nil {
client.issueWarningForFailedQueryUptoThreshold(gpuIndex, gpuUtil.name, err.Error())
continue
}
gpuUtil.gpuIndex = uint(gpuIndex)
gpuUtil.time = time.Now()
gpuUtil.setFloat64(mean)
deviceMetrics = append(deviceMetrics, gpuUtil)
client.logger.Debugf("Nvidia device %d has GPU utilization of %.1f%%", gpuIndex, 100.0*gpuUtil.asFloat64())
}
return deviceMetrics
}
func (client *nvmlClient) getAverageGpuUtilizationSinceLastQuery(device nvml.Device) (float64, error) {
nvmlType, samples, ret := nvmlDeviceGetSamples(device, nvml.GPU_UTILIZATION_SAMPLES, client.deviceToLastSeenTimestamp[device])
if ret != nvml.SUCCESS {
return 0.0, fmt.Errorf("%v", nvml.ErrorString(ret))
}
var mean float64
var count int64
latestTimestamp := client.deviceToLastSeenTimestamp[device]
for _, sample := range samples {
value, err := nvmlSampleAsFloat64(sample.SampleValue, nvmlType)
if err != nil {
return 0.0, err
}
if sample.TimeStamp > client.deviceToLastSeenTimestamp[device] {
mean += value
count++
}
if sample.TimeStamp > latestTimestamp {
latestTimestamp = sample.TimeStamp
}
}
client.deviceToLastSeenTimestamp[device] = latestTimestamp
if count == 0 {
return 0.0, fmt.Errorf("No valid samples since last query")
}
mean /= 100.0 * float64(count)
return mean, nil
}
func (client *nvmlClient) collectDeviceMemoryInfo() []deviceMetric {
deviceMetrics := make([]deviceMetric, 0, 2*len(client.devices))
gpuMemUsed := deviceMetric{name: "nvml.gpu.memory.bytes_used"}
gpuMemFree := deviceMetric{name: "nvml.gpu.memory.bytes_free"}
for gpuIndex, device := range client.devices {
memInfo, ret := nvmlDeviceGetMemoryInfo(device)
timestamp := time.Now()
if ret != nvml.SUCCESS {
client.issueWarningForFailedQueryUptoThreshold(gpuIndex, gpuMemUsed.name, nvml.ErrorString(ret))
continue
}
gpuMemUsed.gpuIndex = uint(gpuIndex)
gpuMemUsed.time = timestamp
gpuMemUsed.setInt64(int64(memInfo.Used))
deviceMetrics = append(deviceMetrics, gpuMemUsed)
gpuMemFree.gpuIndex = uint(gpuIndex)
gpuMemFree.time = timestamp
gpuMemFree.setInt64(int64(memInfo.Free))
deviceMetrics = append(deviceMetrics, gpuMemFree)
client.logger.Debugf("Nvidia device %d has %d bytes used and %d bytes free", gpuIndex, gpuMemUsed.asInt64(), gpuMemFree.asInt64())
}
return deviceMetrics
}
func (client *nvmlClient) collectProcessMetrics() []processMetric {
if client.disable {
return nil
}
processMetrics := make([]processMetric, 0)
for gpuIndex, device := range client.devices {
if !client.deviceToAccountingIsEnabled[device] {
continue
}
pids, ret := nvmlDeviceGetAccountingPids(device)
if ret != nvml.SUCCESS {
msg := fmt.Sprintf("Unable to query cached PIDs on '%v", nvml.ErrorString(ret))
client.issueWarningForFailedQueryUptoThreshold(gpuIndex, "nvml.processes", msg)
continue
}
for _, pid := range pids {
metricName := fmt.Sprintf("nvml.processes{pid=%d}", pid)
stats, ret := nvml.DeviceGetAccountingStats(device, uint32(pid))
if ret != nvml.SUCCESS {
msg := fmt.Sprintf("Unable to query pid %d account statistics on '%v", pid, nvml.ErrorString(ret))
client.issueWarningForFailedQueryUptoThreshold(gpuIndex, metricName, msg)
continue
}
if stats.IsRunning != 1 {
continue
}
metric := processMetric{
time: time.Now(),
processPid: pid,
gpuIndex: uint(gpuIndex),
lifetimeGpuUtilization: uint64(stats.GpuUtilization),
lifetimeGpuMaxMemory: stats.MaxMemoryUsage,
}
err := metric.setMetadataLabels()
if err != nil {
metricName := fmt.Sprintf("nvml.processes{pid=%d}.metadata", metric.processPid)
client.issueWarningForFailedQueryUptoThreshold(int(metric.gpuIndex), metricName, err.Error())
}
processMetrics = append(processMetrics, metric)
client.logger.Debugf("Found pid %d (owner %s command %s) has used Nvidia device %d\n",
metric.processPid, metric.owner, metric.commandLine, metric.gpuIndex)
}
}
return processMetrics
}
func (metric *processMetric) setMetadataLabels() error {
process, err := process.NewProcess(int32(metric.processPid))
if err != nil {
return fmt.Errorf("Unable to obtain process handle for pid %d to query for metadata on '%v'", metric.processPid, err)
}
metric.processName, err = process.Name()
if err != nil {
return fmt.Errorf("Unable to query pid %d process name on '%v'", metric.processPid, err)
}
commandLineSlice, err := process.CmdlineSlice()
if err != nil {
return fmt.Errorf("Unable to query pid %d command line slice on '%v'", metric.processPid, err)
}
if len(commandLineSlice) > 0 {
metric.command = commandLineSlice[0]
}
metric.commandLine = strings.Join(commandLineSlice, " ")
if len(metric.commandLine) > 1024 {
metric.commandLine = metric.commandLine[:1024]
}
metric.owner, err = process.Username()
if err != nil {
return fmt.Errorf("Unable to query pid %d username on '%v'", metric.processPid, err)
}
return nil
}
func (client *nvmlClient) issueWarningForFailedQueryUptoThreshold(deviceIdx int, metricName string, reason string) {
deviceMetric := fmt.Sprintf("device%d.%s", deviceIdx, metricName)
client.deviceMetricToFailedQueryCount[deviceMetric]++
failedCount := client.deviceMetricToFailedQueryCount[deviceMetric]
if failedCount <= maxWarningsForFailedDeviceMetricQuery {
client.logger.Warnf("Unable to query '%s' for Nvidia device %d on '%s'", metricName, deviceIdx, reason)
if failedCount == maxWarningsForFailedDeviceMetricQuery {
client.logger.Warnf("Surpressing further device query warnings for '%s' for Nvidia device %d", metricName, deviceIdx)
}
}
}