metric/system/process/process.go (309 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//go:build (darwin && cgo) || freebsd || linux || windows || aix
package process
import (
"context"
"errors"
"fmt"
"sort"
"strings"
"syscall"
"time"
psutil "github.com/shirou/gopsutil/v4/process"
"github.com/elastic/elastic-agent-libs/mapstr"
"github.com/elastic/elastic-agent-libs/opt"
"github.com/elastic/elastic-agent-libs/transform/typeconv"
"github.com/elastic/elastic-agent-system-metrics/metric"
"github.com/elastic/elastic-agent-system-metrics/metric/system/network"
"github.com/elastic/elastic-agent-system-metrics/metric/system/resolve"
"github.com/elastic/go-sysinfo"
sysinfotypes "github.com/elastic/go-sysinfo/types"
)
var errFetchingPIDs = "error fetching PID metrics for %d processes, most likely a \"permission denied\" error. Enable debug logging to determine the exact cause."
// ListStates is a wrapper that returns a list of processess with only the basic PID info filled out.
func ListStates(hostfs resolve.Resolver) ([]ProcState, error) {
init := Stats{
Hostfs: hostfs,
Procs: []string{".*"},
EnableCgroups: false,
skipExtended: true,
}
err := init.Init()
if err != nil {
return nil, fmt.Errorf("error initializing process collectors: %w", err)
}
// actually fetch the PIDs from the OS-specific code
pidMap, plist, err := init.FetchPids()
if err != nil && !isNonFatal(err) {
return nil, fmt.Errorf("error gathering PIDs: %w", err)
}
failedPIDs := extractFailedPIDs(pidMap)
if err != nil && len(failedPIDs) > 0 {
init.logger.Debugf("error fetching process metrics: %v", err)
return plist, NonFatalErr{Err: fmt.Errorf(errFetchingPIDs, len(failedPIDs))}
}
return plist, toNonFatal(err)
}
// GetPIDState returns the state of a given PID
// It will return ErrProcNotExist if the process was not found.
func GetPIDState(hostfs resolve.Resolver, pid int) (PidState, error) {
// This library still doesn't have a good cross-platform way to distinguish between "does not eixst" and other process errors.
// This is a fairly difficult problem to solve in a cross-platform way
exists, err := psutil.PidExistsWithContext(context.Background(), int32(pid))
if err != nil {
return "", fmt.Errorf("Error trying to find process: %d: %w", pid, err)
}
if !exists {
return "", ErrProcNotExist
}
// GetInfoForPid will return the smallest possible dataset for a PID
procState, err := GetInfoForPid(hostfs, pid)
if err != nil {
return "", fmt.Errorf("error getting state info for pid %d: %w", pid, err)
}
return procState.State, nil
}
// Get fetches the configured processes and returns a list of formatted events and root ECS fields
func (procStats *Stats) Get() ([]mapstr.M, []mapstr.M, error) {
// If the user hasn't configured any kind of process glob, return
if len(procStats.Procs) == 0 {
return nil, nil, nil
}
// actually fetch the PIDs from the OS-specific code
pidMap, plist, wrappedErr := procStats.FetchPids()
if wrappedErr != nil && !isNonFatal(wrappedErr) {
return nil, nil, fmt.Errorf("error gathering PIDs: %w", wrappedErr)
}
failedPIDs := extractFailedPIDs(pidMap)
// We use this to track processes over time.
procStats.ProcsMap.SetMap(pidMap)
// filter the process list that will be passed down to users
plist = procStats.includeTopProcesses(plist)
// This is a holdover until we migrate this library to metricbeat/internal
// At which point we'll use the memory code there.
var totalPhyMem uint64
if procStats.host != nil {
memStats, err := procStats.host.Memory()
if err != nil {
procStats.logger.Warnf("Getting memory details: %v", err)
} else {
totalPhyMem = memStats.Total
}
}
// Format the list to the MapStr type used by the outputs
procs := make([]mapstr.M, 0, len(plist))
rootEvents := make([]mapstr.M, 0, len(plist))
for _, process := range plist {
process := process
// Add the RSS pct memory first
process.Memory.Rss.Pct = GetProcMemPercentage(process, totalPhyMem)
// Create the root event
rootMap := processRootEvent(&process)
proc, err := procStats.getProcessEvent(&process)
if err != nil {
return nil, nil, fmt.Errorf("error converting process for pid %d: %w", process.Pid.ValueOr(0), err)
}
procs = append(procs, proc)
rootEvents = append(rootEvents, rootMap)
}
if wrappedErr != nil && len(failedPIDs) > 0 {
procStats.logger.Debugf("error fetching process metrics: %v", wrappedErr)
return procs, rootEvents, NonFatalErr{Err: fmt.Errorf(errFetchingPIDs, len(failedPIDs))}
}
return procs, rootEvents, toNonFatal(wrappedErr)
}
// GetOne fetches process data for a given PID if its name matches the regexes provided from the host.
func (procStats *Stats) GetOne(pid int) (mapstr.M, error) {
pidStat, _, err := procStats.pidFill(pid, false)
if err != nil && !isNonFatal(err) {
return nil, fmt.Errorf("error fetching PID %d: %w", pid, err)
}
procStats.ProcsMap.SetPid(pid, pidStat)
return procStats.getProcessEvent(&pidStat)
}
// GetOneRootEvent is the same as `GetOne()` but it returns an
// event formatted as expected by ECS
func (procStats *Stats) GetOneRootEvent(pid int) (mapstr.M, mapstr.M, error) {
pidStat, _, wrappedErr := procStats.pidFill(pid, false)
if wrappedErr != nil && !isNonFatal(wrappedErr) {
return nil, nil, fmt.Errorf("error fetching PID %d: %w", pid, wrappedErr)
}
procStats.ProcsMap.SetPid(pid, pidStat)
procMap, err := procStats.getProcessEvent(&pidStat)
if err != nil {
return nil, nil, fmt.Errorf("error formatting process %d: %w", pid, err)
}
rootMap := processRootEvent(&pidStat)
return procMap, rootMap, toNonFatal(wrappedErr)
}
// GetSelf gets process info for the beat itself
// Be advised that if you call this method on a Stats object that was created with an alternate
// `Hostfs` setting, this method will return data for that pid as it exists on that hostfs.
// For example, if called from inside a container with a `hostfs` path for the container host,
// the PID in the ProcState object will be the PID as the host sees it.
func (procStats *Stats) GetSelf() (ProcState, error) {
self, err := GetSelfPid(procStats.Hostfs)
if err != nil {
return ProcState{}, fmt.Errorf("error finding PID: %w", err)
}
pidStat, _, err := procStats.pidFill(self, false)
if err != nil && !isNonFatal(err) {
return ProcState{}, fmt.Errorf("error fetching PID %d: %w", self, err)
}
procStats.ProcsMap.SetPid(self, pidStat)
return pidStat, toNonFatal(err)
}
// pidIter wraps a few lines of generic code that all OS-specific FetchPids() functions must call.
// this also handles the process of adding to the maps/lists in order to limit the code duplication in all the OS implementations
// NOTE: this method will sometimes return a NonFatalError{} wrapper for errors that can optionally be ignored.
func (procStats *Stats) pidIter(pid int, procMap ProcsMap, proclist []ProcState) (ProcsMap, []ProcState, error) {
status, saved, err := procStats.pidFill(pid, true)
var nonFatalErr error
if err != nil {
procMap[pid] = ProcState{Failed: true}
if !errors.Is(err, NonFatalErr{}) {
procStats.logger.Debugf("Error fetching PID info for %d, skipping: %s", pid, err)
// While monitoring a set of processes, some processes might get killed after we get all the PIDs
// So, there's no need to capture "process not found" error.
if errors.Is(err, syscall.ESRCH) {
return procMap, proclist, nil
}
return procMap, proclist, err
}
nonFatalErr = fmt.Errorf("error for pid %d: %w", pid, err)
procStats.logger.Debugf(err.Error())
}
if !saved {
procStats.logger.Debugf("Process name does not match the provided regex; PID=%d; name=%s", pid, status.Name)
return procMap, proclist, nonFatalErr
}
// there was some non-fatal error and given state is partial
if nonFatalErr != nil {
status.Partial = true
}
procMap[pid] = status
proclist = append(proclist, status)
return procMap, proclist, nonFatalErr
}
// pidFill is an entrypoint used by OS-specific code to fill out a pid.
// This in turn calls various OS-specific code to fill out the various bits of PID data
// This is done to minimize the code duplication between different OS implementations
// The second return value will only be false if an event has been filtered out.
func (procStats *Stats) pidFill(pid int, filter bool) (ProcState, bool, error) {
// Fetch proc state so we can get the name for filtering based on user's filter.
var wrappedErr error
// OS-specific entrypoint, get basic info so we can at least run matchProcess
status, err := GetInfoForPid(procStats.Hostfs, pid)
if err != nil {
return status, true, fmt.Errorf("GetInfoForPid failed for pid %d: %w", pid, err)
}
if procStats.skipExtended {
return status, true, nil
}
// Some OSes use the cache to avoid expensive system calls,
// cacheCmdLine reads from the cache.
status = procStats.cacheCmdLine(status)
// Filter based on user-supplied func
if filter {
if !procStats.matchProcess(status.Name) {
return status, false, nil
}
}
// If we've passed the filter, continue to fill out the rest of the metrics
status, err = FillPidMetrics(procStats.Hostfs, pid, status, procStats.isWhitelistedEnvVar)
if err != nil {
if !errors.Is(err, NonFatalErr{}) {
return status, true, fmt.Errorf("FillPidMetrics failed for PID %d: %w", pid, err)
}
wrappedErr = errors.Join(wrappedErr, err)
procStats.logger.Debugf(wrappedErr.Error())
}
if status.CPU.Total.Ticks.Exists() {
status.CPU.Total.Value = opt.FloatWith(metric.Round(float64(status.CPU.Total.Ticks.ValueOr(0))))
}
// postprocess with cgroups and percentages
last, ok := procStats.ProcsMap.GetPid(status.Pid.ValueOr(0))
status.SampleTime = time.Now()
if ok {
status = GetProcCPUPercentage(last, status)
}
if procStats.EnableCgroups {
cgStats, err := procStats.cgroups.GetStatsForPid(status.Pid.ValueOr(0))
if err != nil {
procStats.logger.Debugf("Non-fatal error fetching cgroups metrics for pid %d, metrics are valid but partial: %s", pid, err)
} else {
status.Cgroup = cgStats
if ok {
status.Cgroup.FillPercentages(last.Cgroup, status.SampleTime, last.SampleTime)
}
}
} // end cgroups processor
if _, isExcluded := procStats.excludedPIDs[uint64(pid)]; !isExcluded {
status, err = FillMetricsRequiringMoreAccess(pid, status)
if err != nil {
procStats.logger.Debugf("error calling FillMetricsRequiringMoreAccess for pid %d: %w", pid, err)
}
// Generate `status.Cmdline` here for compatibility because on Windows
// `status.Args` is set by `FillMetricsRequiringMoreAccess`.
if len(status.Args) > 0 && status.Cmdline == "" {
status.Cmdline = strings.Join(status.Args, " ")
}
}
// network data
if procStats.EnableNetwork {
procHandle, err := sysinfo.Process(pid)
// treat this as a soft error
if err != nil {
procStats.logger.Debugf("error initializing process handler for pid %d while trying to fetch network data: %w", pid, err)
} else {
procNet, ok := procHandle.(sysinfotypes.NetworkCounters)
if ok {
status.Network, err = procNet.NetworkCounters()
if err != nil {
procStats.logger.Debugf("error fetching network counters for process %d: %w", pid, err)
}
}
}
}
return status, true, wrappedErr
}
// cacheCmdLine fills out Env and arg metrics from any stored previous metrics for the pid
func (procStats *Stats) cacheCmdLine(in ProcState) ProcState {
if previousProc, ok := procStats.ProcsMap.GetPid(in.Pid.ValueOr(0)); ok {
if procStats.CacheCmdLine {
in.Args = previousProc.Args
in.Cmdline = previousProc.Cmdline
}
env := previousProc.Env
in.Env = env
}
return in
}
// return a formatted MapStr of the process metrics
func (procStats *Stats) getProcessEvent(process *ProcState) (mapstr.M, error) {
// Remove CPUTicks if needed
if !procStats.CPUTicks {
process.CPU.User.Ticks = opt.NewUintNone()
process.CPU.System.Ticks = opt.NewUintNone()
process.CPU.Total.Ticks = opt.NewUintNone()
}
proc := mapstr.M{}
err := typeconv.Convert(&proc, process)
if procStats.EnableNetwork && process.Network != nil {
proc["network"] = network.MapProcNetCountersWithFilter(process.Network, procStats.NetworkMetrics)
}
return proc, err
}
// matchProcess checks if the provided process name matches any of the process regexes
func (procStats *Stats) matchProcess(name string) bool {
for _, reg := range procStats.procRegexps {
if reg.MatchString(name) {
return true
}
}
return false
}
// includeTopProcesses filters down the metrics based on top CPU or top Memory settings
func (procStats *Stats) includeTopProcesses(processes []ProcState) []ProcState {
if !procStats.IncludeTop.Enabled ||
(procStats.IncludeTop.ByCPU == 0 && procStats.IncludeTop.ByMemory == 0) {
return processes
}
var result []ProcState
if procStats.IncludeTop.ByCPU > 0 {
numProcs := procStats.IncludeTop.ByCPU
if len(processes) < procStats.IncludeTop.ByCPU {
numProcs = len(processes)
}
sort.Slice(processes, func(i, j int) bool {
return processes[i].CPU.Total.Pct.ValueOr(0) > processes[j].CPU.Total.Pct.ValueOr(0)
})
result = append(result, processes[:numProcs]...)
}
if procStats.IncludeTop.ByMemory > 0 {
numProcs := procStats.IncludeTop.ByMemory
if len(processes) < procStats.IncludeTop.ByMemory {
numProcs = len(processes)
}
sort.Slice(processes, func(i, j int) bool {
return processes[i].Memory.Rss.Bytes.ValueOr(0) > processes[j].Memory.Rss.Bytes.ValueOr(0)
})
for _, proc := range processes[:numProcs] {
proc := proc
if !isProcessInSlice(result, &proc) {
result = append(result, proc)
}
}
}
return result
}
// isWhitelistedEnvVar returns true if the given variable name is a match for
// the whitelist. If the whitelist is empty it returns false.
func (procStats *Stats) isWhitelistedEnvVar(varName string) bool {
if len(procStats.envRegexps) == 0 {
return false
}
for _, p := range procStats.envRegexps {
if p.MatchString(varName) {
return true
}
}
return false
}
func extractFailedPIDs(procMap ProcsMap) []int {
// calculate the total amount of partial/failed PIDs
list := make([]int, 0)
for pid, state := range procMap {
if state.Failed {
list = append(list, pid)
// delete the failed state so we don't return the state to caller
delete(procMap, pid)
} else if state.Partial {
list = append(list, pid)
}
}
return list
}