components/otelopscol/receiver/dcgmreceiver/client.go (297 lines of code) (raw):
// Copyright 2023 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//go:build gpu
// +build gpu
package dcgmreceiver
import (
"errors"
"fmt"
"math"
"time"
"github.com/NVIDIA/go-dcgm/pkg/dcgm"
"go.uber.org/zap"
)
const maxWarningsForFailedDeviceMetricQuery = 5
const dcgmProfilingFieldsStart = dcgm.Short(1000)
var ErrDcgmInitialization = errors.New("error initializing DCGM")
type dcgmClientSettings struct {
endpoint string
pollingInterval time.Duration
retryBlankValues bool
maxRetries int
fields []string
}
type deviceMetrics struct {
ModelName string
UUID string
Metrics MetricsMap
}
type dcgmClient struct {
logger *zap.SugaredLogger
handleCleanup func()
enabledFieldIDs []dcgm.Short
enabledFieldGroup dcgm.FieldHandle
deviceGroup dcgm.GroupHandle
devices map[uint]deviceMetrics
lastSuccessfulPoll time.Time
deviceMetricToFailedQueryCount map[string]int
pollingInterval time.Duration
retryBlankValues bool
maxRetries int
}
// Can't pass argument dcgm.mode because it is unexported
var dcgmInit = func(args ...string) (func(), error) {
return dcgm.Init(dcgm.Standalone, args...)
}
var dcgmGetValuesSince = dcgm.GetValuesSince
func newClient(settings *dcgmClientSettings, logger *zap.Logger) (*dcgmClient, error) {
dcgmCleanup, err := initializeDcgm(settings.endpoint, logger)
if err != nil {
return nil, errors.Join(ErrDcgmInitialization, err)
}
enabledFieldGroup := dcgm.FieldHandle{}
requestedFieldIDs := toFieldIDs(settings.fields)
supportedProfilingFieldIDs, err := getSupportedProfilingFields()
if err != nil {
// If there is error querying the supported fields at all, let the
// receiver collect basic metrics: (GPU utilization, used/free memory).
logger.Sugar().Warnf("Error querying supported profiling fields on '%w'. GPU profiling metrics will not be collected.", err)
}
enabledFields, unavailableFields := filterSupportedFields(requestedFieldIDs, supportedProfilingFieldIDs)
for _, f := range unavailableFields {
logger.Sugar().Warnf("Field '%s' is not supported", dcgmIDToName[f])
}
var deviceGroup dcgm.GroupHandle
if len(enabledFields) != 0 {
supportedDeviceIndices, err := dcgm.GetSupportedDevices()
if err != nil {
return nil, fmt.Errorf("Unable to discover supported GPUs on %w", err)
}
logger.Sugar().Infof("Discovered %d supported GPU devices", len(supportedDeviceIndices))
deviceGroup, err = createDeviceGroup(logger, supportedDeviceIndices)
if err != nil {
return nil, err
}
enabledFieldGroup, err = setWatchesOnEnabledFields(settings.pollingInterval, logger, deviceGroup, enabledFields)
if err != nil {
_ = dcgm.FieldGroupDestroy(enabledFieldGroup)
return nil, fmt.Errorf("Unable to set field watches on %w", err)
}
}
return &dcgmClient{
logger: logger.Sugar(),
handleCleanup: dcgmCleanup,
enabledFieldIDs: enabledFields,
enabledFieldGroup: enabledFieldGroup,
deviceGroup: deviceGroup,
devices: map[uint]deviceMetrics{},
lastSuccessfulPoll: time.Now(),
deviceMetricToFailedQueryCount: make(map[string]int),
pollingInterval: settings.pollingInterval,
retryBlankValues: settings.retryBlankValues,
maxRetries: settings.maxRetries,
}, nil
}
// initializeDcgm tries to initialize a DCGM connection; returns a cleanup func
// only if the connection is initialized successfully without error
func initializeDcgm(endpoint string, logger *zap.Logger) (func(), error) {
isSocket := "0"
dcgmCleanup, err := dcgmInit(endpoint, isSocket)
if err != nil {
msg := fmt.Sprintf("Unable to connect to DCGM daemon at %s on %v; Is the DCGM daemon running?", endpoint, err)
logger.Sugar().Warn(msg)
if dcgmCleanup != nil {
dcgmCleanup()
}
return nil, fmt.Errorf("%s", msg)
}
logger.Sugar().Infof("Connected to DCGM daemon at %s", endpoint)
return dcgmCleanup, nil
}
func newDeviceMetrics(logger *zap.SugaredLogger, gpuIndex uint) (deviceMetrics, error) {
deviceInfo, err := dcgm.GetDeviceInfo(gpuIndex)
if err != nil {
logger.Warnf("Unable to query device info for NVIDIA device %d on '%w'", gpuIndex, err)
return deviceMetrics{}, err
}
device := deviceMetrics{
ModelName: deviceInfo.Identifiers.Model,
UUID: deviceInfo.UUID,
Metrics: MetricsMap{},
}
logger.Infof("Discovered NVIDIA device %s with UUID %s (DCGM GPU ID %d)", device.ModelName, device.UUID, gpuIndex)
return device, nil
}
func createDeviceGroup(logger *zap.Logger, deviceIndices []uint) (dcgm.GroupHandle, error) {
deviceGroupName := "google-cloud-ops-agent-group"
deviceGroup, err := dcgm.CreateGroup(deviceGroupName)
if err != nil {
return dcgm.GroupHandle{}, fmt.Errorf("Unable to create DCGM GPU group '%s' on %w", deviceGroupName, err)
}
for _, gpuIndex := range deviceIndices {
err = dcgm.AddToGroup(deviceGroup, gpuIndex)
if err != nil {
return dcgm.GroupHandle{}, fmt.Errorf("Unable add NVIDIA device %d to GPU group '%s' on %w", gpuIndex, deviceGroupName, err)
}
}
logger.Sugar().Infof("Created GPU group '%s'", deviceGroupName)
return deviceGroup, nil
}
func toFieldIDs(fields []string) []dcgm.Short {
requestedFieldIDs := make([]dcgm.Short, len(fields))
for i, f := range fields {
requestedFieldIDs[i] = dcgm.DCGM_FI[f]
}
return requestedFieldIDs
}
// getSupportedProfilingFields calls the DCGM query function to find out all
// profiling fields that are supported by the current GPUs
func getSupportedProfilingFields() ([]dcgm.Short, error) {
supported := []dcgm.Short{}
// GetSupportedMetricGroups currently does not support passing the actual
// group handle; here we pass 0 to query supported fields for group 0, which
// is the default DCGM group that is **supposed** to include all GPUs of the
// host.
fieldGroups, err := dcgm.GetSupportedMetricGroups(0)
if err != nil {
var dcgmErr *dcgm.DcgmError
if errors.As(err, &dcgmErr) {
// When the device does not support profiling metrics, this function
// will return DCGM_ST_MODULE_NOT_LOADED:
// "This request is serviced by a module of DCGM that is not
// currently loaded." Example of this is NVIDIA P4
if dcgmErr.Code == dcgm.DCGM_ST_MODULE_NOT_LOADED {
return supported, nil
}
}
return supported, err
}
for i := 0; i < len(fieldGroups); i++ {
for j := 0; j < len(fieldGroups[i].FieldIds); j++ {
supported = append(supported, dcgm.Short(fieldGroups[i].FieldIds[j]))
}
}
return supported, nil
}
// filterSupportedFields takes the user requested fields and device supported
// profiling fields, and filters to return those that are requested & supported
// to be the enabledFields and requested but not supported as unavailableFields
func filterSupportedFields(requestedFields []dcgm.Short, supportedProfilingFields []dcgm.Short) ([]dcgm.Short, []dcgm.Short) {
var enabledFields []dcgm.Short
var unavailableFields []dcgm.Short
for _, ef := range requestedFields {
// For fields like `DCGM_FI_DEV_*`, which are not
// profiling fields, assume they are always present.
support := ef < dcgmProfilingFieldsStart
for _, sf := range supportedProfilingFields {
if sf == ef {
support = true
break
}
}
if support {
enabledFields = append(enabledFields, ef)
} else {
unavailableFields = append(unavailableFields, ef)
}
}
return enabledFields, unavailableFields
}
// Internal-only
type dcgmWatchParams struct {
fieldGroupName string
updateFreqUs int64
maxKeepTime float64
maxKeepSamples int32
}
// Internal-only
func setWatchesOnFields(logger *zap.Logger, deviceGroup dcgm.GroupHandle, fieldIDs []dcgm.Short, params dcgmWatchParams) (dcgm.FieldHandle, error) {
var err error
fieldGroup, err := dcgm.FieldGroupCreate(params.fieldGroupName, fieldIDs)
if err != nil {
return dcgm.FieldHandle{}, fmt.Errorf("Unable to create DCGM field group '%s'", params.fieldGroupName)
}
msg := fmt.Sprintf("Created DCGM field group '%s' with field ids: ", params.fieldGroupName)
for _, fieldID := range fieldIDs {
msg += fmt.Sprintf("%d ", fieldID)
}
logger.Sugar().Info(msg)
// Note: DCGM retained samples = Max(maxKeepSamples, maxKeepTime/updateFreq)
dcgmUpdateFreq := params.updateFreqUs
dcgmMaxKeepTime := params.maxKeepTime
dcgmMaxKeepSamples := params.maxKeepSamples
err = dcgm.WatchFieldsWithGroupEx(fieldGroup, deviceGroup, dcgmUpdateFreq, dcgmMaxKeepTime, dcgmMaxKeepSamples)
if err != nil {
return fieldGroup, fmt.Errorf("Setting watches for DCGM field group '%s' failed on %w", params.fieldGroupName, err)
}
logger.Sugar().Infof("Setting watches for DCGM field group '%s' succeeded", params.fieldGroupName)
return fieldGroup, nil
}
const maxKeepSamples = 100 // TODO: Is this enough?
func setWatchesOnEnabledFields(pollingInterval time.Duration, logger *zap.Logger, deviceGroup dcgm.GroupHandle, enabledFieldIDs []dcgm.Short) (dcgm.FieldHandle, error) {
return setWatchesOnFields(logger, deviceGroup, enabledFieldIDs, dcgmWatchParams{
// Note: Add random suffix to avoid conflict amongnst any parallel collectors
fieldGroupName: fmt.Sprintf("google-cloud-ops-agent-metrics-%d", randSource.Intn(10000)),
// Note: DCGM retained samples = Max(maxKeepSamples, maxKeepTime/updateFreq)
updateFreqUs: int64(pollingInterval / time.Microsecond),
maxKeepTime: 600.0, /* 10 min */
maxKeepSamples: maxKeepSamples,
})
}
func (client *dcgmClient) cleanup() {
_ = dcgm.FieldGroupDestroy(client.enabledFieldGroup)
_ = dcgm.DestroyGroup(client.deviceGroup)
if client.handleCleanup != nil {
client.handleCleanup()
}
client.logger.Info("Shutdown DCGM")
}
// collect will poll dcgm for any new metrics, updating client.devices as appropriate
// It returns the estimated polling interval.
func (client *dcgmClient) collect() (time.Duration, error) {
client.logger.Debugf("Polling DCGM daemon for field values")
if len(client.enabledFieldIDs) == 0 {
// Make sure we don't try to scrape without a device group (since we don't construct one when there are no enabled fields).
return 0, nil
}
fieldValues, pollTime, err := dcgmGetValuesSince(client.deviceGroup, client.enabledFieldGroup, client.lastSuccessfulPoll)
if err != nil {
msg := fmt.Sprintf("Unable to poll DCGM daemon for metrics: %s", err)
client.issueWarningForFailedQueryUptoThreshold("all-profiling-metrics", maxWarningsForFailedDeviceMetricQuery, msg)
return 0, err
}
client.logger.Debugf("Got %d field values over %s", len(fieldValues), pollTime.Sub(client.lastSuccessfulPoll))
client.lastSuccessfulPoll = pollTime
oldestTs := int64(math.MaxInt64)
newestTs := int64(0)
for _, fieldValue := range fieldValues {
if fieldValue.EntityGroupId != dcgm.FE_GPU {
continue
}
gpuIndex := fieldValue.EntityId
if _, ok := client.devices[gpuIndex]; !ok {
device, err := newDeviceMetrics(client.logger, gpuIndex)
if err != nil {
continue
}
client.devices[gpuIndex] = device
}
device := client.devices[gpuIndex]
dcgmName := dcgmIDToName[dcgm.Short(fieldValue.FieldId)]
if err := isValidValue(fieldValue); err == errBlankValue {
// Blank values are expected at startup.
continue
} else if err == errNotSupported {
client.issueWarningForFailedQueryUptoThreshold(dcgmName, 1, fmt.Sprintf("Field '%s' is not supported", dcgmName))
continue
} else if err != nil {
msg := fmt.Sprintf("Received invalid value (ts %d gpu %d) %s: %v", fieldValue.Ts, gpuIndex, dcgmName, err)
client.issueWarningForFailedQueryUptoThreshold(fmt.Sprintf("device%d.%s", gpuIndex, dcgmName), maxWarningsForFailedDeviceMetricQuery, msg)
continue
}
if fieldValue.Ts < oldestTs {
oldestTs = fieldValue.Ts
}
if fieldValue.Ts > newestTs {
newestTs = fieldValue.Ts
}
if _, ok := device.Metrics[dcgmName]; !ok {
device.Metrics[dcgmName] = &metricStats{}
}
device.Metrics[dcgmName].Update(fieldValue)
}
duration := time.Duration(newestTs-oldestTs) * time.Microsecond
client.logger.Debugf("Successful poll of DCGM daemon returned %v of data", duration)
// If we did a partial poll, there should be more room in the buffer.
duration = max(duration, client.pollingInterval*maxKeepSamples)
return duration, nil
}
// getDeviceMetrics returns a deep copy of client.devices
func (client *dcgmClient) getDeviceMetrics() map[uint]deviceMetrics {
out := map[uint]deviceMetrics{}
for gpuIndex, device := range client.devices {
newMetrics := MetricsMap{}
for key, value := range device.Metrics {
newValue := *value
newMetrics[key] = &newValue
}
// device is already a copy here
device.Metrics = newMetrics
out[gpuIndex] = device
}
return out
}
func (client *dcgmClient) issueWarningForFailedQueryUptoThreshold(dcgmName string, limit int, reason string) {
client.deviceMetricToFailedQueryCount[dcgmName]++
failedCount := client.deviceMetricToFailedQueryCount[dcgmName]
if failedCount <= limit {
client.logger.Warn(reason)
if limit > 1 && failedCount == limit {
client.logger.Warnf("Surpressing further device query warnings for '%s'", dcgmName)
}
}
}