metricbeat/module/linux/rapl/rapl.go (188 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//go:build linux
package rapl
import (
"errors"
"fmt"
"io/ioutil"
"os"
"os/user"
"path/filepath"
"regexp"
"strconv"
"strings"
"time"
"github.com/fearful-symmetry/gorapl/rapl"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/cfgwarn"
"github.com/elastic/beats/v7/metricbeat/mb"
"github.com/elastic/elastic-agent-libs/mapstr"
"github.com/elastic/elastic-agent-system-metrics/metric/system/resolve"
)
// init registers the MetricSet with the central registry as soon as the program
// starts. The New function will be called later to instantiate an instance of
// the MetricSet for each host defined in the module's configuration. After the
// MetricSet has been created then Fetch will begin to be called periodically.
func init() {
mb.Registry.MustAddMetricSet("linux", "rapl", New)
}
type config struct {
UseMSRSafe bool `config:"rapl.use_msr_safe"`
}
// MetricSet holds any configuration or state information. It must implement
// the mb.MetricSet interface. And this is best achieved by embedding
// mb.BaseMetricSet because it implements all of the required mb.MetricSet
// interface methods except for Fetch.
type MetricSet struct {
mb.BaseMetricSet
handlers map[int]rapl.RAPLHandler
lastValues map[int]map[rapl.RAPLDomain]energyTrack
}
type energyTrack struct {
joules float64
time time.Time
}
type energyUsage struct {
joules float64
watts float64
}
// New creates a new instance of the MetricSet. New is responsible for unpacking
// any MetricSet specific configuration options if there are any.
func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
cfgwarn.Beta("The linux rapl metricset is beta.")
config := config{}
if err := base.Module().UnpackConfig(&config); err != nil {
return nil, err
}
sys := base.Module().(resolve.Resolver)
CPUList, err := getMSRCPUs(sys)
if err != nil {
return nil, fmt.Errorf("error getting list of CPUs to query: %w", err)
}
// check to see if msr-safe is installed
if config.UseMSRSafe {
queryPath := sys.ResolveHostFS(filepath.Join("/dev/cpu/", fmt.Sprint(CPUList[0]), "msr_safe"))
_, err := os.Stat(queryPath)
if errors.Is(err, os.ErrNotExist) {
return nil, errors.New("no msr_safe device found. Is the kernel module loaded?")
}
if err != nil {
return nil, fmt.Errorf("could not check msr_safe device at %s: %w", queryPath, err)
}
} else {
user, err := user.Current()
if err != nil {
return nil, fmt.Errorf("error fetching user list: %w", err)
}
if user.Uid != "0" {
return nil, errors.New("linux/rapl must run as root if not using msr-safe")
}
}
handlers := map[int]rapl.RAPLHandler{}
for _, cpu := range CPUList {
formatPath := sys.ResolveHostFS("/dev/cpu/%d")
if config.UseMSRSafe {
formatPath = filepath.Join(formatPath, "/msr_safe")
} else {
formatPath = filepath.Join(formatPath, "/msr")
}
handler, err := rapl.CreateNewHandler(cpu, formatPath)
if err != nil {
return nil, fmt.Errorf("error creating handler at path %s for CPU %d: %w", formatPath, cpu, err)
}
handlers[cpu] = handler
}
ms := &MetricSet{
BaseMetricSet: base,
handlers: handlers,
}
ms.updatePower()
return ms, nil
}
// Fetch methods implements the data gathering and data conversion to the right
// format. It publishes the event which is then forwarded to the output. In case
// of an error set the Error field of mb.Event or simply call report.Error().
func (m *MetricSet) Fetch(report mb.ReporterV2) error {
watts := m.updatePower()
for cpu, metric := range watts {
evt := mapstr.M{
"core": cpu,
}
for domain, power := range metric {
evt[strings.ToLower(domain.Name)] = mapstr.M{
"watts": common.Round(power.watts, common.DefaultDecimalPlacesCount),
"joules": common.Round(power.joules, common.DefaultDecimalPlacesCount),
}
}
report.Event(mb.Event{
MetricSetFields: evt,
})
}
return nil
}
func (m *MetricSet) updatePower() map[int]map[rapl.RAPLDomain]energyUsage {
newEnergy := make(map[int]map[rapl.RAPLDomain]energyTrack)
powerUsage := make(map[int]map[rapl.RAPLDomain]energyUsage)
for cpu, handler := range m.handlers {
powerUsage[cpu] = make(map[rapl.RAPLDomain]energyUsage)
domainList := map[rapl.RAPLDomain]energyTrack{}
for _, domain := range handler.GetDomains() {
joules, err := handler.ReadEnergyStatus(domain)
// This is a bit hard to check for, as many of the registers are model-specific
// Unless we want to maintain a map of every CPU, we sort of have to play it fast and loose.
if err == rapl.ErrMSRDoesNotExist {
continue
}
if err != nil {
m.Logger().Infof("Error reading MSR from domain %s: %s skipping.", domain, err)
continue
}
domainList[domain] = energyTrack{joules: joules, time: time.Now()}
// divide the delta of joules by the time interval to get watts
if m.lastValues != nil {
// This register can roll over. If/when it does, skip reporting
if m.lastValues[cpu][domain].joules > joules {
continue
}
delta := m.lastValues[cpu][domain].joules - joules
timeDelta := m.lastValues[cpu][domain].time.Sub(domainList[domain].time)
powerUsage[cpu][domain] = energyUsage{watts: delta / timeDelta.Seconds(), joules: joules}
}
}
newEnergy[cpu] = domainList
}
m.lastValues = newEnergy
if m.lastValues == nil {
return nil
}
return powerUsage
}
// getMSRCPUs forms a list of CPU cores to query
// For multi-processor systems, this will be more than 1.
func getMSRCPUs(hostfs resolve.Resolver) ([]int, error) {
CPUs, err := topoPkgCPUMap(hostfs)
if err != nil {
return nil, fmt.Errorf("error fetching CPU topology: %w", err)
}
coreList := []int{}
for _, cores := range CPUs {
coreList = append(coreList, cores[0])
}
// if we don't have any cores, assume something has gone wrong
if len(coreList) == 0 {
return coreList, errors.New("no cores found")
}
return coreList, nil
}
// I'm not really sure how portable this algo is
// it is, however, the simplest way to do this. The intel power gadget iterates through each CPU using affinity masks, and runs `cpuid` in a loop to
// figure things out
// This uses /sys/devices/system/cpu/cpu*/topology/physical_package_id, which is what lscpu does. I *think* geopm does something similar to this.
func topoPkgCPUMap(hostfs resolve.Resolver) (map[int][]int, error) {
sysdir := "/sys/devices/system/cpu/"
cpuMap := make(map[int][]int)
files, err := ioutil.ReadDir(hostfs.ResolveHostFS(sysdir))
if err != nil {
return nil, err
}
re := regexp.MustCompile("cpu[0-9]+")
for _, file := range files {
if file.IsDir() && re.MatchString(file.Name()) {
fullPkg := hostfs.ResolveHostFS(filepath.Join(sysdir, file.Name(), "/topology/physical_package_id"))
dat, err := ioutil.ReadFile(fullPkg)
if err != nil {
return nil, fmt.Errorf("error reading file %s: %w", fullPkg, err)
}
phys, err := strconv.ParseInt(strings.TrimSpace(string(dat)), 10, 64)
if err != nil {
return nil, fmt.Errorf("error parsing value from %s: %w", fullPkg, err)
}
var cpuCore int
_, err = fmt.Sscanf(file.Name(), "cpu%d", &cpuCore)
if err != nil {
return nil, fmt.Errorf("error fetching CPU core value from string %s: %w", file.Name(), err)
}
pkgList, ok := cpuMap[int(phys)]
if !ok {
cpuMap[int(phys)] = []int{cpuCore}
} else {
pkgList = append(pkgList, cpuCore)
cpuMap[int(phys)] = pkgList
}
}
}
return cpuMap, nil
}