receiver/awsebsnvmereceiver/scraper.go (150 lines of code) (raw):
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT
package awsebsnvmereceiver
import (
"context"
"fmt"
"math"
"strings"
"time"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/receiver"
"go.uber.org/zap"
"github.com/aws/amazon-cloudwatch-agent/internal/util/collections"
"github.com/aws/amazon-cloudwatch-agent/receiver/awsebsnvmereceiver/internal/metadata"
"github.com/aws/amazon-cloudwatch-agent/receiver/awsebsnvmereceiver/internal/nvme"
)
type nvmeScraper struct {
logger *zap.Logger
mb *metadata.MetricsBuilder
nvme nvme.DeviceInfoProvider
allowedDevices collections.Set[string]
}
type ebsDevices struct {
volumeID string
deviceNames []string
}
type recordDataMetricFunc func(pcommon.Timestamp, int64)
// For unit testing
var getMetrics = nvme.GetMetrics
func (s *nvmeScraper) start(_ context.Context, _ component.Host) error {
s.logger.Debug("Starting NVMe scraper", zap.String("receiver", metadata.Type.String()))
return nil
}
func (s *nvmeScraper) shutdown(_ context.Context) error {
s.logger.Debug("Shutting down NVMe scraper", zap.String("receiver", metadata.Type.String()))
return nil
}
func (s *nvmeScraper) scrape(_ context.Context) (pmetric.Metrics, error) {
s.logger.Debug("Began scraping for NVMe metrics")
ebsDevicesByController, err := s.getEbsDevicesByController()
if err != nil {
return pmetric.NewMetrics(), err
}
now := pcommon.NewTimestampFromTime(time.Now())
for id, ebsDevices := range ebsDevicesByController {
// Some devices are owned by root:root, root:disk, etc, so the agent will attempt to
// retrieve the metric for a device (grouped by controller ID) until the first
// success
foundWorkingDevice := false
for _, device := range ebsDevices.deviceNames {
if foundWorkingDevice {
break
}
devicePath, err := s.nvme.DevicePath(device)
if err != nil {
s.logger.Debug("unable to get device path", zap.String("device", device), zap.Error(err))
continue
}
metrics, err := getMetrics(devicePath)
if err != nil {
s.logger.Debug("unable to get metrics for device", zap.String("device", device), zap.Error(err))
continue
}
foundWorkingDevice = true
rb := s.mb.NewResourceBuilder()
rb.SetVolumeID(ebsDevices.volumeID)
s.recordMetric(s.mb.RecordDiskioEbsTotalReadOpsDataPoint, now, metrics.ReadOps)
s.recordMetric(s.mb.RecordDiskioEbsTotalWriteOpsDataPoint, now, metrics.WriteOps)
s.recordMetric(s.mb.RecordDiskioEbsTotalReadBytesDataPoint, now, metrics.ReadBytes)
s.recordMetric(s.mb.RecordDiskioEbsTotalWriteBytesDataPoint, now, metrics.WriteBytes)
s.recordMetric(s.mb.RecordDiskioEbsTotalReadTimeDataPoint, now, metrics.TotalReadTime)
s.recordMetric(s.mb.RecordDiskioEbsTotalWriteTimeDataPoint, now, metrics.TotalWriteTime)
s.recordMetric(s.mb.RecordDiskioEbsVolumePerformanceExceededIopsDataPoint, now, metrics.EBSIOPSExceeded)
s.recordMetric(s.mb.RecordDiskioEbsVolumePerformanceExceededTpDataPoint, now, metrics.EBSThroughputExceeded)
s.recordMetric(s.mb.RecordDiskioEbsEc2InstancePerformanceExceededIopsDataPoint, now, metrics.EC2IOPSExceeded)
s.recordMetric(s.mb.RecordDiskioEbsEc2InstancePerformanceExceededTpDataPoint, now, metrics.EC2ThroughputExceeded)
s.recordMetric(s.mb.RecordDiskioEbsVolumeQueueLengthDataPoint, now, metrics.QueueLength)
s.mb.EmitForResource(metadata.WithResource(rb.Emit()))
}
if foundWorkingDevice {
s.logger.Debug("emitted metrics for nvme device with controller id", zap.Int("controllerID", id), zap.String("volumeID", ebsDevices.volumeID))
} else {
s.logger.Debug("unable to get metrics for nvme device with controller id", zap.Int("controllerID", id), zap.String("volumeID", ebsDevices.volumeID))
}
}
return s.mb.Emit(), nil
}
// nvme0, nvme1, ... nvme{n} can have multiple devices with the same controller ID.
// For example nvme0n1, nvme0n1p1 are all under the controller ID 0. The metrics
// are the same based on the controller ID. We also do not want to duplicate metrics
// so we group the devices by the controller ID.
func (s *nvmeScraper) getEbsDevicesByController() (map[int]*ebsDevices, error) {
allNvmeDevices, err := s.nvme.GetAllDevices()
if err != nil {
return nil, err
}
devices := make(map[int]*ebsDevices)
for _, device := range allNvmeDevices {
deviceName := device.DeviceName()
// Check if all devices should be collected. Otherwise check if defined by user
hasAsterisk := s.allowedDevices.Contains("*")
if !hasAsterisk {
if isAllowed := s.allowedDevices.Contains(deviceName); !isAllowed {
s.logger.Debug("skipping un-allowed device", zap.String("device", deviceName))
continue
}
}
// NVMe device with the same controller ID was already seen. We do not need to repeat the work of
// retrieving the volume ID and validating if it's an EBS device
if entry, seenController := devices[device.Controller()]; seenController {
entry.deviceNames = append(entry.deviceNames, deviceName)
s.logger.Debug("skipping unnecessary device validation steps", zap.String("device", deviceName))
continue
}
isEbs, err := s.nvme.IsEbsDevice(&device)
if err != nil || !isEbs {
s.logger.Debug("skipping non-ebs nvme device", zap.String("device", deviceName), zap.Error(err))
continue
}
serial, err := s.nvme.GetDeviceSerial(&device)
if err != nil {
s.logger.Debug("unable to get serial number of device", zap.String("device", deviceName), zap.Error(err))
continue
}
// The serial should begin with vol and have content after the vol prefix
if !strings.HasPrefix(serial, "vol") || len(serial) < 4 {
s.logger.Debug("device serial is not a valid volume id", zap.String("device", deviceName), zap.String("serial", serial))
continue
}
devices[device.Controller()] = &ebsDevices{
deviceNames: []string{deviceName},
volumeID: fmt.Sprintf("vol-%s", serial[3:]),
}
}
return devices, nil
}
func newScraper(cfg *Config,
settings receiver.Settings,
nvme nvme.DeviceInfoProvider,
allowedDevices collections.Set[string],
) *nvmeScraper {
return &nvmeScraper{
logger: settings.TelemetrySettings.Logger,
mb: metadata.NewMetricsBuilder(cfg.MetricsBuilderConfig, settings),
nvme: nvme,
allowedDevices: allowedDevices,
}
}
func (s *nvmeScraper) recordMetric(recordFn recordDataMetricFunc, ts pcommon.Timestamp, val uint64) {
converted, err := safeUint64ToInt64(val)
if err != nil {
s.logger.Debug("skipping metric due to potential integer overflow")
return
}
recordFn(ts, converted)
}
func safeUint64ToInt64(value uint64) (int64, error) {
if value > math.MaxInt64 {
return 0, fmt.Errorf("value %d is too large for int64", value)
}
return int64(value), nil
}