receiver/hostmetricsreceiver/internal/scraper/processscraper/process_scraper.go (365 lines of code) (raw):
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package processscraper // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/hostmetricsreceiver/internal/scraper/processscraper"
import (
"context"
"errors"
"fmt"
"runtime"
"time"
"github.com/shirou/gopsutil/v4/cpu"
"github.com/shirou/gopsutil/v4/host"
"github.com/shirou/gopsutil/v4/process"
"github.com/tilinna/clock"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/scraper"
"go.opentelemetry.io/collector/scraper/scrapererror"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter/filterset"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/hostmetricsreceiver/internal/scraper/processscraper/internal/metadata"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/hostmetricsreceiver/internal/scraper/processscraper/ucal"
)
const (
cpuMetricsLen = 1
memoryMetricsLen = 2
memoryUtilizationMetricsLen = 1
diskMetricsLen = 1
pagingMetricsLen = 1
threadMetricsLen = 1
contextSwitchMetricsLen = 1
fileDescriptorMetricsLen = 1
handleMetricsLen = 1
signalMetricsLen = 1
uptimeMetricsLen = 1
metricsLen = cpuMetricsLen + memoryMetricsLen + diskMetricsLen + memoryUtilizationMetricsLen + pagingMetricsLen + threadMetricsLen + contextSwitchMetricsLen + fileDescriptorMetricsLen + signalMetricsLen + handleCountMetricsLen + uptimeMetricsLen
)
// scraper for Process Metrics
type processScraper struct {
settings scraper.Settings
config *Config
mb *metadata.MetricsBuilder
includeFS filterset.FilterSet
excludeFS filterset.FilterSet
scrapeProcessDelay time.Duration
ucals map[int32]*ucal.CPUUtilizationCalculator
logicalCores int
// for mocking
getProcessCreateTime func(p processHandle, ctx context.Context) (int64, error)
getProcessHandles func(context.Context) (processHandles, error)
}
// newProcessScraper creates a Process Scraper
func newProcessScraper(settings scraper.Settings, cfg *Config) (*processScraper, error) {
scraper := &processScraper{
settings: settings,
config: cfg,
getProcessCreateTime: processHandle.CreateTimeWithContext,
getProcessHandles: getGopsutilProcessHandles,
scrapeProcessDelay: cfg.ScrapeProcessDelay,
ucals: make(map[int32]*ucal.CPUUtilizationCalculator),
}
var err error
if len(cfg.Include.Names) > 0 {
scraper.includeFS, err = filterset.CreateFilterSet(cfg.Include.Names, &cfg.Include.Config)
if err != nil {
return nil, fmt.Errorf("error creating process include filters: %w", err)
}
}
if len(cfg.Exclude.Names) > 0 {
scraper.excludeFS, err = filterset.CreateFilterSet(cfg.Exclude.Names, &cfg.Exclude.Config)
if err != nil {
return nil, fmt.Errorf("error creating process exclude filters: %w", err)
}
}
logicalCores, err := cpu.Counts(true)
if err != nil {
return nil, fmt.Errorf("error getting number of logical cores: %w", err)
}
scraper.logicalCores = logicalCores
return scraper, nil
}
func (s *processScraper) start(context.Context, component.Host) error {
s.mb = metadata.NewMetricsBuilder(s.config.MetricsBuilderConfig, s.settings)
return nil
}
func (s *processScraper) scrape(ctx context.Context) (pmetric.Metrics, error) {
var errs scrapererror.ScrapeErrors
// If the boot time cache featuregate is disabled, this will refresh the
// cached boot time value for use in the current scrape. This functionally
// replicates the previous functionality in all but the most extreme
// cases of boot time changing in the middle of a scrape.
if !bootTimeCacheFeaturegate.IsEnabled() {
host.EnableBootTimeCache(false)
_, err := host.BootTimeWithContext(ctx)
if err != nil {
errs.AddPartial(1, fmt.Errorf(`retrieving boot time failed with error "%w", using cached boot time`, err))
}
host.EnableBootTimeCache(true)
}
data, err := s.getProcessMetadata(ctx)
if err != nil {
var partialErr scrapererror.PartialScrapeError
if !errors.As(err, &partialErr) {
return pmetric.NewMetrics(), err
}
errs.AddPartial(partialErr.Failed, partialErr)
}
presentPIDs := make(map[int32]struct{}, len(data))
for _, md := range data {
presentPIDs[md.pid] = struct{}{}
now := pcommon.NewTimestampFromTime(clock.Now(ctx))
if err = s.scrapeAndAppendCPUTimeMetric(ctx, now, md.handle, md.pid); err != nil {
errs.AddPartial(cpuMetricsLen, fmt.Errorf("error reading cpu times for process %q (pid %v): %w", md.executable.name, md.pid, err))
}
if err = s.scrapeAndAppendMemoryUsageMetrics(ctx, now, md.handle); err != nil {
errs.AddPartial(memoryMetricsLen, fmt.Errorf("error reading memory info for process %q (pid %v): %w", md.executable.name, md.pid, err))
}
if err = s.scrapeAndAppendMemoryUtilizationMetric(ctx, now, md.handle); err != nil {
errs.AddPartial(memoryUtilizationMetricsLen, fmt.Errorf("error reading memory utilization for process %q (pid %v): %w", md.executable.name, md.pid, err))
}
if err = s.scrapeAndAppendDiskMetrics(ctx, now, md.handle); err != nil && !s.config.MuteProcessIOError {
errs.AddPartial(diskMetricsLen, fmt.Errorf("error reading disk usage for process %q (pid %v): %w", md.executable.name, md.pid, err))
}
if err = s.scrapeAndAppendPagingMetric(ctx, now, md.handle); err != nil {
errs.AddPartial(pagingMetricsLen, fmt.Errorf("error reading memory paging info for process %q (pid %v): %w", md.executable.name, md.pid, err))
}
if err = s.scrapeAndAppendThreadsMetrics(ctx, now, md.handle); err != nil {
errs.AddPartial(threadMetricsLen, fmt.Errorf("error reading thread info for process %q (pid %v): %w", md.executable.name, md.pid, err))
}
if err = s.scrapeAndAppendContextSwitchMetrics(ctx, now, md.handle); err != nil {
errs.AddPartial(contextSwitchMetricsLen, fmt.Errorf("error reading context switch counts for process %q (pid %v): %w", md.executable.name, md.pid, err))
}
if err = s.scrapeAndAppendOpenFileDescriptorsMetric(ctx, now, md.handle); err != nil {
errs.AddPartial(fileDescriptorMetricsLen, fmt.Errorf("error reading open file descriptor count for process %q (pid %v): %w", md.executable.name, md.pid, err))
}
if err = s.scrapeAndAppendHandlesMetric(ctx, now, md.handle); err != nil {
errs.AddPartial(handleMetricsLen, fmt.Errorf("error reading handle count for process %q (pid %v): %w", md.executable.name, md.pid, err))
}
if err = s.scrapeAndAppendSignalsPendingMetric(ctx, now, md.handle); err != nil {
errs.AddPartial(signalMetricsLen, fmt.Errorf("error reading pending signals for process %q (pid %v): %w", md.executable.name, md.pid, err))
}
if err = s.scrapeAndAppendUptimeMetric(ctx, now, md.handle); err != nil {
errs.AddPartial(uptimeMetricsLen, fmt.Errorf("error calculating uptime for process %q (pid %v): %w", md.executable.name, md.pid, err))
}
s.mb.EmitForResource(metadata.WithResource(md.buildResource(s.mb.NewResourceBuilder())),
metadata.WithStartTimeOverride(pcommon.Timestamp(md.createTime*1e6)))
}
// Cleanup any [ucal.CPUUtilizationCalculator]s for PIDs that are no longer present
for pid := range s.ucals {
if _, ok := presentPIDs[pid]; !ok {
delete(s.ucals, pid)
}
}
if s.config.MuteProcessAllErrors {
return s.mb.Emit(), nil
}
return s.mb.Emit(), errs.Combine()
}
// getProcessMetadata returns a slice of processMetadata, including handles,
// for all currently running processes. If errors occur obtaining information
// for some processes, an error will be returned, but any processes that were
// successfully obtained will still be returned.
func (s *processScraper) getProcessMetadata(ctx context.Context) ([]*processMetadata, error) {
handles, err := s.getProcessHandles(ctx)
if err != nil {
return nil, err
}
var errs scrapererror.ScrapeErrors
data := make([]*processMetadata, 0, handles.Len())
for i := 0; i < handles.Len(); i++ {
pid := handles.Pid(i)
handle := handles.At(i)
exe, err := getProcessExecutable(ctx, handle)
if err != nil {
if !s.config.MuteProcessExeError {
errs.AddPartial(1, fmt.Errorf("error reading process executable for pid %v: %w", pid, err))
}
}
name, err := getProcessName(ctx, handle, exe)
if err != nil {
if !s.config.MuteProcessNameError {
errs.AddPartial(1, fmt.Errorf("error reading process name for pid %v: %w", pid, err))
}
continue
}
cgroup, err := getProcessCgroup(ctx, handle)
if err != nil {
if !s.config.MuteProcessCgroupError {
errs.AddPartial(1, fmt.Errorf("error reading process cgroup for pid %v: %w", pid, err))
}
continue
}
executable := &executableMetadata{name: name, path: exe, cgroup: cgroup}
// filter processes by name
if (s.includeFS != nil && !s.includeFS.Matches(executable.name)) ||
(s.excludeFS != nil && s.excludeFS.Matches(executable.name)) {
continue
}
command, err := getProcessCommand(ctx, handle)
if err != nil {
errs.AddPartial(0, fmt.Errorf("error reading command for process %q (pid %v): %w", executable.name, pid, err))
}
username, err := handle.UsernameWithContext(ctx)
if err != nil {
if !s.config.MuteProcessUserError {
errs.AddPartial(0, fmt.Errorf("error reading username for process %q (pid %v): %w", executable.name, pid, err))
}
}
createTime, err := s.getProcessCreateTime(handle, ctx)
if err != nil {
errs.AddPartial(0, fmt.Errorf("error reading create time for process %q (pid %v): %w", executable.name, pid, err))
// set the start time to now to avoid including this when a scrape_process_delay is set
createTime = time.Now().UnixMilli()
}
if s.scrapeProcessDelay.Milliseconds() > (time.Now().UnixMilli() - createTime) {
continue
}
parentProcessID := int32(0)
if s.config.ResourceAttributes.ProcessParentPid.Enabled {
parentProcessID, err = parentPid(ctx, handle, pid)
if err != nil {
errs.AddPartial(0, fmt.Errorf("error reading parent pid for process %q (pid %v): %w", executable.name, pid, err))
}
}
md := &processMetadata{
pid: pid,
parentPid: parentProcessID,
executable: executable,
command: command,
username: username,
handle: handle,
createTime: createTime,
}
data = append(data, md)
}
return data, errs.Combine()
}
func (s *processScraper) scrapeAndAppendCPUTimeMetric(ctx context.Context, now pcommon.Timestamp, handle processHandle, pid int32) error {
if !s.config.Metrics.ProcessCPUTime.Enabled && !s.config.Metrics.ProcessCPUUtilization.Enabled {
return nil
}
times, err := handle.TimesWithContext(ctx)
if err != nil {
return err
}
if s.config.Metrics.ProcessCPUTime.Enabled {
s.recordCPUTimeMetric(now, times)
}
if !s.config.Metrics.ProcessCPUUtilization.Enabled {
return nil
}
if _, ok := s.ucals[pid]; !ok {
s.ucals[pid] = &ucal.CPUUtilizationCalculator{}
}
err = s.ucals[pid].CalculateAndRecord(now, s.logicalCores, times, s.recordCPUUtilization)
return err
}
func (s *processScraper) scrapeAndAppendMemoryUsageMetrics(ctx context.Context, now pcommon.Timestamp, handle processHandle) error {
if !s.config.Metrics.ProcessMemoryUsage.Enabled && !s.config.Metrics.ProcessMemoryVirtual.Enabled {
return nil
}
mem, err := handle.MemoryInfoWithContext(ctx)
if err != nil {
return err
}
s.mb.RecordProcessMemoryUsageDataPoint(now, int64(mem.RSS))
s.mb.RecordProcessMemoryVirtualDataPoint(now, int64(mem.VMS))
return nil
}
func (s *processScraper) scrapeAndAppendMemoryUtilizationMetric(ctx context.Context, now pcommon.Timestamp, handle processHandle) error {
if !s.config.Metrics.ProcessMemoryUtilization.Enabled {
return nil
}
memoryPercent, err := handle.MemoryPercentWithContext(ctx)
if err != nil {
return err
}
s.mb.RecordProcessMemoryUtilizationDataPoint(now, float64(memoryPercent))
return nil
}
func (s *processScraper) scrapeAndAppendDiskMetrics(ctx context.Context, now pcommon.Timestamp, handle processHandle) error {
if (!s.config.Metrics.ProcessDiskIo.Enabled && !s.config.Metrics.ProcessDiskOperations.Enabled) || runtime.GOOS == "darwin" {
return nil
}
io, err := handle.IOCountersWithContext(ctx)
if err != nil {
if s.config.MuteProcessIOError {
return nil
}
return err
}
s.mb.RecordProcessDiskIoDataPoint(now, int64(io.ReadBytes), metadata.AttributeDirectionRead)
s.mb.RecordProcessDiskIoDataPoint(now, int64(io.WriteBytes), metadata.AttributeDirectionWrite)
s.mb.RecordProcessDiskOperationsDataPoint(now, int64(io.ReadCount), metadata.AttributeDirectionRead)
s.mb.RecordProcessDiskOperationsDataPoint(now, int64(io.WriteCount), metadata.AttributeDirectionWrite)
return nil
}
func (s *processScraper) scrapeAndAppendPagingMetric(ctx context.Context, now pcommon.Timestamp, handle processHandle) error {
if !s.config.Metrics.ProcessPagingFaults.Enabled {
return nil
}
pageFaultsStat, err := handle.PageFaultsWithContext(ctx)
if err != nil {
return err
}
s.mb.RecordProcessPagingFaultsDataPoint(now, int64(pageFaultsStat.MajorFaults), metadata.AttributePagingFaultTypeMajor)
s.mb.RecordProcessPagingFaultsDataPoint(now, int64(pageFaultsStat.MinorFaults), metadata.AttributePagingFaultTypeMinor)
return nil
}
func (s *processScraper) scrapeAndAppendThreadsMetrics(ctx context.Context, now pcommon.Timestamp, handle processHandle) error {
if !s.config.Metrics.ProcessThreads.Enabled {
return nil
}
threads, err := handle.NumThreadsWithContext(ctx)
if err != nil {
return err
}
s.mb.RecordProcessThreadsDataPoint(now, int64(threads))
return nil
}
func (s *processScraper) scrapeAndAppendContextSwitchMetrics(ctx context.Context, now pcommon.Timestamp, handle processHandle) error {
if !s.config.Metrics.ProcessContextSwitches.Enabled {
return nil
}
contextSwitches, err := handle.NumCtxSwitchesWithContext(ctx)
if err != nil {
return err
}
s.mb.RecordProcessContextSwitchesDataPoint(now, contextSwitches.Involuntary, metadata.AttributeContextSwitchTypeInvoluntary)
s.mb.RecordProcessContextSwitchesDataPoint(now, contextSwitches.Voluntary, metadata.AttributeContextSwitchTypeVoluntary)
return nil
}
func (s *processScraper) scrapeAndAppendOpenFileDescriptorsMetric(ctx context.Context, now pcommon.Timestamp, handle processHandle) error {
if !s.config.Metrics.ProcessOpenFileDescriptors.Enabled {
return nil
}
fds, err := handle.NumFDsWithContext(ctx)
if err != nil {
return err
}
s.mb.RecordProcessOpenFileDescriptorsDataPoint(now, int64(fds))
return nil
}
func (s *processScraper) scrapeAndAppendHandlesMetric(ctx context.Context, now pcommon.Timestamp, handle processHandle) error {
if !s.config.Metrics.ProcessHandles.Enabled {
return nil
}
handleCount, err := handle.GetProcessHandleCountWithContext(ctx)
if err != nil {
return err
}
s.mb.RecordProcessHandlesDataPoint(now, handleCount)
return nil
}
func (s *processScraper) scrapeAndAppendSignalsPendingMetric(ctx context.Context, now pcommon.Timestamp, handle processHandle) error {
if !s.config.Metrics.ProcessSignalsPending.Enabled {
return nil
}
rlimitStats, err := handle.RlimitUsageWithContext(ctx, true)
if err != nil {
return err
}
for _, rlimitStat := range rlimitStats {
if rlimitStat.Resource == process.RLIMIT_SIGPENDING {
s.mb.RecordProcessSignalsPendingDataPoint(now, int64(rlimitStat.Used))
break
}
}
return nil
}
func (s *processScraper) scrapeAndAppendUptimeMetric(ctx context.Context, now pcommon.Timestamp, handle processHandle) error {
if !s.config.Metrics.ProcessUptime.Enabled {
return nil
}
ts, err := s.getProcessCreateTime(handle, ctx)
if err != nil {
return err
}
// Since create time is in milliseconds, it needs to be multiplied
// by the constant value so that it can be used as part of the time.Unix function.
uptime := now.AsTime().Sub(time.Unix(0, ts*int64(time.Millisecond)))
s.mb.RecordProcessUptimeDataPoint(now, uptime.Seconds())
return nil
}