components/otelopscol/receiver/dcgmreceiver/scraper.go (317 lines of code) (raw):

// Copyright 2023 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. //go:build gpu // +build gpu package dcgmreceiver import ( "context" "errors" "fmt" "time" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/receiver" "golang.org/x/sync/errgroup" "github.com/GoogleCloudPlatform/opentelemetry-operations-collector/components/otelopscol/receiver/dcgmreceiver/internal/metadata" ) type dcgmScraper struct { config *Config settings receiver.Settings initRetryDelay time.Duration mb *metadata.MetricsBuilder collectTriggerCh chan<- struct{} metricsCh <-chan map[uint]deviceMetrics cancel func() } func newDcgmScraper(config *Config, settings receiver.Settings) *dcgmScraper { return &dcgmScraper{config: config, settings: settings, initRetryDelay: 10 * time.Second} } const scrapePollingInterval = 100 * time.Millisecond // TODO: Choose an appropriate value // initClient will try to initialize the communication with the DCGM service; if // success, create a client; only return errors if DCGM service is available but // failed to create client. func (s *dcgmScraper) initClient() (*dcgmClient, error) { clientSettings := &dcgmClientSettings{ endpoint: s.config.TCPAddrConfig.Endpoint, pollingInterval: scrapePollingInterval, fields: discoverRequestedFields(s.config), retryBlankValues: true, maxRetries: 5, } client, err := newClient(clientSettings, s.settings.Logger) if err != nil { s.settings.Logger.Sugar().Warn(err) if errors.Is(err, ErrDcgmInitialization) { // If cannot connect to DCGM, return no error and retry at next // collection time return nil, nil } return nil, err } return client, nil } func (s *dcgmScraper) start(ctx context.Context, _ component.Host) error { startTime := pcommon.NewTimestampFromTime(time.Now()) mbConfig := metadata.DefaultMetricsBuilderConfig() mbConfig.Metrics = s.config.Metrics s.mb = metadata.NewMetricsBuilder( mbConfig, s.settings, metadata.WithStartTime(startTime)) scrapeCtx, scrapeCancel := context.WithCancel(context.WithoutCancel(ctx)) g, scrapeCtx := errgroup.WithContext(scrapeCtx) s.cancel = func() { scrapeCancel() _ = g.Wait() // Ignore the error from a canceled context } metricsCh := make(chan map[uint]deviceMetrics) collectTriggerCh := make(chan struct{}, 1) // Capacity of 1 makes this asynchronous s.metricsCh = metricsCh s.collectTriggerCh = collectTriggerCh g.Go(func() error { return s.runConnectLoop(scrapeCtx, metricsCh, collectTriggerCh) }) return nil } func (s *dcgmScraper) stop(_ context.Context) error { if s.cancel != nil { s.cancel() s.cancel = nil } return nil } func discoverRequestedFields(config *Config) []string { requestedFields := []string{} if config.Metrics.GpuDcgmUtilization.Enabled { requestedFields = append(requestedFields, "DCGM_FI_PROF_GR_ENGINE_ACTIVE") requestedFields = append(requestedFields, "DCGM_FI_DEV_GPU_UTIL") // fallback } if config.Metrics.GpuDcgmSmUtilization.Enabled { requestedFields = append(requestedFields, "DCGM_FI_PROF_SM_ACTIVE") } if config.Metrics.GpuDcgmSmOccupancy.Enabled { requestedFields = append(requestedFields, "DCGM_FI_PROF_SM_OCCUPANCY") } if config.Metrics.GpuDcgmPipeUtilization.Enabled { requestedFields = append(requestedFields, "DCGM_FI_PROF_PIPE_TENSOR_ACTIVE") requestedFields = append(requestedFields, "DCGM_FI_PROF_PIPE_FP64_ACTIVE") requestedFields = append(requestedFields, "DCGM_FI_PROF_PIPE_FP32_ACTIVE") requestedFields = append(requestedFields, "DCGM_FI_PROF_PIPE_FP16_ACTIVE") } if config.Metrics.GpuDcgmCodecEncoderUtilization.Enabled { requestedFields = append(requestedFields, "DCGM_FI_DEV_ENC_UTIL") } if config.Metrics.GpuDcgmCodecDecoderUtilization.Enabled { requestedFields = append(requestedFields, "DCGM_FI_DEV_DEC_UTIL") } if config.Metrics.GpuDcgmMemoryBytesUsed.Enabled { requestedFields = append(requestedFields, "DCGM_FI_DEV_FB_FREE") requestedFields = append(requestedFields, "DCGM_FI_DEV_FB_USED") requestedFields = append(requestedFields, "DCGM_FI_DEV_FB_RESERVED") } if config.Metrics.GpuDcgmMemoryBandwidthUtilization.Enabled { requestedFields = append(requestedFields, "DCGM_FI_PROF_DRAM_ACTIVE") requestedFields = append(requestedFields, "DCGM_FI_DEV_MEM_COPY_UTIL") // fallback } if config.Metrics.GpuDcgmPcieIo.Enabled { requestedFields = append(requestedFields, "DCGM_FI_PROF_PCIE_TX_BYTES") requestedFields = append(requestedFields, "DCGM_FI_PROF_PCIE_RX_BYTES") } if config.Metrics.GpuDcgmNvlinkIo.Enabled { requestedFields = append(requestedFields, "DCGM_FI_PROF_NVLINK_TX_BYTES") requestedFields = append(requestedFields, "DCGM_FI_PROF_NVLINK_RX_BYTES") } if config.Metrics.GpuDcgmEnergyConsumption.Enabled { requestedFields = append(requestedFields, "DCGM_FI_DEV_TOTAL_ENERGY_CONSUMPTION") requestedFields = append(requestedFields, "DCGM_FI_DEV_POWER_USAGE") // fallback } if config.Metrics.GpuDcgmTemperature.Enabled { requestedFields = append(requestedFields, "DCGM_FI_DEV_GPU_TEMP") } if config.Metrics.GpuDcgmClockFrequency.Enabled { requestedFields = append(requestedFields, "DCGM_FI_DEV_SM_CLOCK") } if config.Metrics.GpuDcgmClockThrottleDurationTime.Enabled { requestedFields = append(requestedFields, "DCGM_FI_DEV_POWER_VIOLATION") requestedFields = append(requestedFields, "DCGM_FI_DEV_THERMAL_VIOLATION") requestedFields = append(requestedFields, "DCGM_FI_DEV_SYNC_BOOST_VIOLATION") requestedFields = append(requestedFields, "DCGM_FI_DEV_BOARD_LIMIT_VIOLATION") requestedFields = append(requestedFields, "DCGM_FI_DEV_LOW_UTIL_VIOLATION") requestedFields = append(requestedFields, "DCGM_FI_DEV_RELIABILITY_VIOLATION") requestedFields = append(requestedFields, "DCGM_FI_DEV_TOTAL_APP_CLOCKS_VIOLATION") requestedFields = append(requestedFields, "DCGM_FI_DEV_TOTAL_BASE_CLOCKS_VIOLATION") } if config.Metrics.GpuDcgmEccErrors.Enabled { requestedFields = append(requestedFields, "DCGM_FI_DEV_ECC_SBE_VOL_TOTAL") requestedFields = append(requestedFields, "DCGM_FI_DEV_ECC_DBE_VOL_TOTAL") } if config.Metrics.GpuDcgmXidErrors.Enabled { // requestedFields = append(requestedFields, "") func() {}() // no-op } return requestedFields } func (s *dcgmScraper) runConnectLoop(ctx context.Context, metricsCh chan<- map[uint]deviceMetrics, collectTriggerCh <-chan struct{}) error { defer close(metricsCh) for { client, _ := s.initClient() // Ignore the error; it's logged in initClient. if client != nil { s.pollClient(ctx, client, metricsCh, collectTriggerCh) } select { case <-ctx.Done(): return ctx.Err() case metricsCh <- map[uint]deviceMetrics{}: // Un-hang any scrapers waiting for data, since we currently have no metrics to offer. case <-time.After(s.initRetryDelay): } } } func (s *dcgmScraper) pollClient(ctx context.Context, client *dcgmClient, metricsCh chan<- map[uint]deviceMetrics, collectTriggerCh <-chan struct{}) { defer client.cleanup() for { waitTime, err := client.collect() // Ignore the error; it's logged in collect() if err != nil { waitTime = 10 * time.Second } // Try to poll at least twice per collection interval waitTime = max( 100*time.Millisecond, min( s.config.CollectionInterval, waitTime, )/2, ) s.settings.Logger.Sugar().Debugf("Waiting %s for the next collection", waitTime) after := time.After(waitTime) for after != nil { deviceMetrics := client.getDeviceMetrics() select { case <-ctx.Done(): return case <-collectTriggerCh: // Loop and trigger a collect() again. after = nil case metricsCh <- deviceMetrics: case <-after: after = nil } } } } func (s *dcgmScraper) scrape(ctx context.Context) (pmetric.Metrics, error) { var deviceMetrics map[uint]deviceMetrics // Trigger a collection cycle to make sure we have fresh metrics. // The select ensures that if there's already a request registered we don't block. select { case s.collectTriggerCh <- struct{}{}: default: } // Now wait for metrics. select { case deviceMetrics = <-s.metricsCh: case <-ctx.Done(): return pmetric.NewMetrics(), ctx.Err() } s.settings.Logger.Sugar().Debugf("Metrics collected: %d", len(deviceMetrics)) now := pcommon.NewTimestampFromTime(time.Now()) for gpuIndex, gpu := range deviceMetrics { s.settings.Logger.Sugar().Debugf("Got %d unique metrics: %v", len(gpu.Metrics), gpu.Metrics) rb := s.mb.NewResourceBuilder() rb.SetGpuNumber(fmt.Sprintf("%d", gpuIndex)) rb.SetGpuUUID(gpu.UUID) rb.SetGpuModel(gpu.ModelName) gpuResource := rb.Emit() v, ok := gpu.Metrics.LastFloat64("DCGM_FI_PROF_GR_ENGINE_ACTIVE") if !ok { v, ok = gpu.Metrics.LastFloat64("DCGM_FI_DEV_GPU_UTIL") v /= 100.0 /* normalize */ } if ok { s.mb.RecordGpuDcgmUtilizationDataPoint(now, v) } if v, ok := gpu.Metrics.LastFloat64("DCGM_FI_PROF_SM_ACTIVE"); ok { s.mb.RecordGpuDcgmSmUtilizationDataPoint(now, v) } if v, ok := gpu.Metrics.LastFloat64("DCGM_FI_PROF_SM_OCCUPANCY"); ok { s.mb.RecordGpuDcgmSmOccupancyDataPoint(now, v) } if v, ok := gpu.Metrics.LastFloat64("DCGM_FI_PROF_PIPE_TENSOR_ACTIVE"); ok { s.mb.RecordGpuDcgmPipeUtilizationDataPoint(now, v, metadata.AttributeGpuPipeTensor) } if v, ok := gpu.Metrics.LastFloat64("DCGM_FI_PROF_PIPE_FP64_ACTIVE"); ok { s.mb.RecordGpuDcgmPipeUtilizationDataPoint(now, v, metadata.AttributeGpuPipeFp64) } if v, ok := gpu.Metrics.LastFloat64("DCGM_FI_PROF_PIPE_FP32_ACTIVE"); ok { s.mb.RecordGpuDcgmPipeUtilizationDataPoint(now, v, metadata.AttributeGpuPipeFp32) } if v, ok := gpu.Metrics.LastFloat64("DCGM_FI_PROF_PIPE_FP16_ACTIVE"); ok { s.mb.RecordGpuDcgmPipeUtilizationDataPoint(now, v, metadata.AttributeGpuPipeFp16) } if v, ok := gpu.Metrics.LastFloat64("DCGM_FI_DEV_ENC_UTIL"); ok { s.mb.RecordGpuDcgmCodecEncoderUtilizationDataPoint(now, v/100.0) /* normalize */ } if v, ok := gpu.Metrics.LastFloat64("DCGM_FI_DEV_DEC_UTIL"); ok { s.mb.RecordGpuDcgmCodecDecoderUtilizationDataPoint(now, v/100.0) /* normalize */ } if v, ok := gpu.Metrics.LastInt64("DCGM_FI_DEV_FB_FREE"); ok { s.mb.RecordGpuDcgmMemoryBytesUsedDataPoint(now, 1e6*v, metadata.AttributeGpuMemoryStateFree) /* MBy to By */ } if v, ok := gpu.Metrics.LastInt64("DCGM_FI_DEV_FB_USED"); ok { s.mb.RecordGpuDcgmMemoryBytesUsedDataPoint(now, 1e6*v, metadata.AttributeGpuMemoryStateUsed) /* MBy to By */ } if v, ok := gpu.Metrics.LastInt64("DCGM_FI_DEV_FB_RESERVED"); ok { s.mb.RecordGpuDcgmMemoryBytesUsedDataPoint(now, 1e6*v, metadata.AttributeGpuMemoryStateReserved) /* MBy to By */ } v, ok = gpu.Metrics.LastFloat64("DCGM_FI_PROF_DRAM_ACTIVE") if !ok { // fallback v, ok = gpu.Metrics.LastFloat64("DCGM_FI_DEV_MEM_COPY_UTIL") v /= 100.0 /* normalize */ } if ok { s.mb.RecordGpuDcgmMemoryBandwidthUtilizationDataPoint(now, v) } if v, ok := gpu.Metrics.IntegratedRate("DCGM_FI_PROF_PCIE_TX_BYTES"); ok { s.mb.RecordGpuDcgmPcieIoDataPoint(now, v, metadata.AttributeNetworkIoDirectionTransmit) } if v, ok := gpu.Metrics.IntegratedRate("DCGM_FI_PROF_PCIE_RX_BYTES"); ok { s.mb.RecordGpuDcgmPcieIoDataPoint(now, v, metadata.AttributeNetworkIoDirectionReceive) } if v, ok := gpu.Metrics.IntegratedRate("DCGM_FI_PROF_NVLINK_TX_BYTES"); ok { s.mb.RecordGpuDcgmNvlinkIoDataPoint(now, v, metadata.AttributeNetworkIoDirectionTransmit) } if v, ok := gpu.Metrics.IntegratedRate("DCGM_FI_PROF_NVLINK_RX_BYTES"); ok { s.mb.RecordGpuDcgmNvlinkIoDataPoint(now, v, metadata.AttributeNetworkIoDirectionReceive) } i, ok := gpu.Metrics.CumulativeTotal("DCGM_FI_DEV_TOTAL_ENERGY_CONSUMPTION") v = float64(i) / 1e3 /* mJ to J */ if !ok { // fallback i, ok = gpu.Metrics.IntegratedRate("DCGM_FI_DEV_POWER_USAGE") v = float64(i) } if ok { s.mb.RecordGpuDcgmEnergyConsumptionDataPoint(now, v) } if v, ok := gpu.Metrics.LastFloat64("DCGM_FI_DEV_GPU_TEMP"); ok { s.mb.RecordGpuDcgmTemperatureDataPoint(now, v) } if v, ok := gpu.Metrics.LastFloat64("DCGM_FI_DEV_SM_CLOCK"); ok { s.mb.RecordGpuDcgmClockFrequencyDataPoint(now, 1e6*v) /* MHz to Hz */ } if v, ok := gpu.Metrics.CumulativeTotal("DCGM_FI_DEV_POWER_VIOLATION"); ok { s.mb.RecordGpuDcgmClockThrottleDurationTimeDataPoint(now, float64(v)/1e9, metadata.AttributeGpuClockViolationPower) /* ns to s */ } if v, ok := gpu.Metrics.CumulativeTotal("DCGM_FI_DEV_THERMAL_VIOLATION"); ok { s.mb.RecordGpuDcgmClockThrottleDurationTimeDataPoint(now, float64(v)/1e9, metadata.AttributeGpuClockViolationThermal) /* ns to s */ } if v, ok := gpu.Metrics.CumulativeTotal("DCGM_FI_DEV_SYNC_BOOST_VIOLATION"); ok { s.mb.RecordGpuDcgmClockThrottleDurationTimeDataPoint(now, float64(v)/1e9, metadata.AttributeGpuClockViolationSyncBoost) /* ns to s */ } if v, ok := gpu.Metrics.CumulativeTotal("DCGM_FI_DEV_BOARD_LIMIT_VIOLATION"); ok { s.mb.RecordGpuDcgmClockThrottleDurationTimeDataPoint(now, float64(v)/1e9, metadata.AttributeGpuClockViolationBoardLimit) /* ns to s */ } if v, ok := gpu.Metrics.CumulativeTotal("DCGM_FI_DEV_LOW_UTIL_VIOLATION"); ok { s.mb.RecordGpuDcgmClockThrottleDurationTimeDataPoint(now, float64(v)/1e9, metadata.AttributeGpuClockViolationLowUtil) /* ns to s */ } if v, ok := gpu.Metrics.CumulativeTotal("DCGM_FI_DEV_RELIABILITY_VIOLATION"); ok { s.mb.RecordGpuDcgmClockThrottleDurationTimeDataPoint(now, float64(v)/1e9, metadata.AttributeGpuClockViolationReliability) /* ns to s */ } if v, ok := gpu.Metrics.CumulativeTotal("DCGM_FI_DEV_TOTAL_APP_CLOCKS_VIOLATION"); ok { s.mb.RecordGpuDcgmClockThrottleDurationTimeDataPoint(now, float64(v)/1e9, metadata.AttributeGpuClockViolationAppClock) /* ns to s */ } if v, ok := gpu.Metrics.CumulativeTotal("DCGM_FI_DEV_TOTAL_BASE_CLOCKS_VIOLATION"); ok { s.mb.RecordGpuDcgmClockThrottleDurationTimeDataPoint(now, float64(v)/1e9, metadata.AttributeGpuClockViolationBaseClock) /* ns to s */ } if v, ok := gpu.Metrics.CumulativeTotal("DCGM_FI_DEV_ECC_SBE_VOL_TOTAL"); ok { s.mb.RecordGpuDcgmEccErrorsDataPoint(now, v, metadata.AttributeGpuErrorTypeSbe) } if v, ok := gpu.Metrics.CumulativeTotal("DCGM_FI_DEV_ECC_DBE_VOL_TOTAL"); ok { s.mb.RecordGpuDcgmEccErrorsDataPoint(now, v, metadata.AttributeGpuErrorTypeDbe) } // TODO: XID errors. // s.mb.RecordGpuDcgmXidErrorsDataPoint(now, metric.asInt64(), xid) s.mb.EmitForResource(metadata.WithResource(gpuResource)) } return s.mb.Emit(), nil }