x-pack/auditbeat/module/system/process/gosysinfo_provider.go (334 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.
package process
import (
"fmt"
"os"
"os/user"
"runtime"
"strconv"
"time"
"github.com/cespare/xxhash/v2"
"github.com/gofrs/uuid/v5"
"github.com/elastic/beats/v7/auditbeat/datastore"
"github.com/elastic/beats/v7/auditbeat/helper/hasher"
"github.com/elastic/beats/v7/libbeat/common/capabilities"
"github.com/elastic/beats/v7/metricbeat/mb"
"github.com/elastic/beats/v7/x-pack/auditbeat/cache"
"github.com/elastic/elastic-agent-libs/mapstr"
"github.com/elastic/go-sysinfo"
"github.com/elastic/go-sysinfo/types"
)
const (
bucketName = "auditbeat.process.v1"
bucketKeyStateTimestamp = "state_timestamp"
)
// SysinfoMetricSet collects data about the host.
type SysInfoMetricSet struct {
MetricSet
hasher *hasher.FileHasher
cache *cache.Cache[*Process]
bucket datastore.Bucket
lastState time.Time
suppressPermissionWarnings bool
}
// Process represents information about a process.
type Process struct {
Info types.ProcessInfo
UserInfo *types.UserInfo
User *user.User
Group *user.Group
CapEffective []string
CapPermitted []string
Hashes map[hasher.HashType]hasher.Digest
Error error
}
// Hash creates a hash for Process.
func (p Process) Hash() uint64 {
h := xxhash.New()
//nolint:errcheck // always return nil err
h.WriteString(strconv.Itoa(p.Info.PID))
//nolint:errcheck // always return nil err
h.WriteString(p.Info.StartTime.String())
return h.Sum64()
}
func (p Process) toMapStr() mapstr.M {
return mapstr.M{
// https://github.com/elastic/ecs#-process-fields
"name": p.Info.Name,
"args": p.Info.Args,
"pid": p.Info.PID,
"parent": mapstr.M{
"pid": p.Info.PPID,
},
"working_directory": p.Info.CWD,
"executable": p.Info.Exe,
"start": p.Info.StartTime,
}
}
// NewFromSysInfo constructs a new MetricSet backed by go-sysinfo.
func NewFromSysInfo(ms MetricSet) (mb.MetricSet, error) {
bucket, err := datastore.OpenBucket(bucketName)
if err != nil {
return nil, fmt.Errorf("failed to open persistent datastore: %w", err)
}
// Load from disk: Time when state was last sent
var lastState time.Time
err = bucket.Load(bucketKeyStateTimestamp, func(blob []byte) error {
if len(blob) > 0 {
return lastState.UnmarshalBinary(blob)
}
return nil
})
if err != nil {
bucket.Close()
return nil, err
}
if !lastState.IsZero() {
ms.log.Debugf("Last state was sent at %v. Next state update by %v.",
lastState, lastState.Add(ms.config.effectiveStatePeriod()))
} else {
ms.log.Debug("No state timestamp found")
}
hasher, err := hasher.NewFileHasher(ms.config.HasherConfig, nil)
if err != nil {
bucket.Close()
return nil, err
}
if runtime.GOOS != "windows" && os.Geteuid() != 0 {
ms.log.Warn("Running as non-root user, will likely not report all processes.")
}
sm := &SysInfoMetricSet{
MetricSet: ms,
cache: cache.New[*Process](),
bucket: bucket,
lastState: lastState,
hasher: hasher,
}
return sm, nil
}
// Close cleans up the MetricSet when it finishes.
func (ms *SysInfoMetricSet) Close() error {
if ms.bucket != nil {
return ms.bucket.Close()
}
return nil
}
// Fetch collects process information. It is invoked periodically.
func (ms *SysInfoMetricSet) Fetch(report mb.ReporterV2) {
needsStateUpdate := time.Since(ms.lastState) > ms.config.effectiveStatePeriod()
if needsStateUpdate || ms.cache.IsEmpty() {
ms.log.Debugf("State update needed (needsStateUpdate=%v, cache.IsEmpty()=%v)", needsStateUpdate, ms.cache.IsEmpty())
err := ms.reportState(report)
if err != nil {
ms.log.Error(err)
report.Error(err)
}
ms.log.Debugf("Next state update by %v", ms.lastState.Add(ms.config.effectiveStatePeriod()))
}
err := ms.reportChanges(report)
if err != nil {
ms.log.Error(err)
report.Error(err)
}
}
// reportState reports all running processes on the system.
func (ms *SysInfoMetricSet) reportState(report mb.ReporterV2) error {
// Only update lastState if this state update was regularly scheduled,
// i.e. not caused by an Auditbeat restart (when the cache would be empty).
if !ms.cache.IsEmpty() {
ms.lastState = time.Now()
}
processes, err := ms.getProcesses()
if err != nil {
return fmt.Errorf("failed to get process infos: %w", err)
}
ms.log.Debugf("Found %v processes", len(processes))
stateID, err := uuid.NewV4()
if err != nil {
return fmt.Errorf("error generating state ID: %w", err)
}
for _, p := range processes {
ms.enrichProcess(p)
if p.Error == nil {
event := ms.processEvent(p, eventTypeState, eventActionExistingProcess)
event.RootFields.Put("event.id", stateID.String())
report.Event(event)
} else {
ms.log.Warn(p.Error)
report.Event(ms.processEvent(p, eventTypeEvent, eventActionProcessError))
}
}
if ms.cache != nil {
// This will initialize the cache with the current processes
ms.cache.DiffAndUpdateCache(processes)
}
// Save time so we know when to send the state again (config.StatePeriod)
timeBytes, err := ms.lastState.MarshalBinary()
if err != nil {
return err
}
err = ms.bucket.Store(bucketKeyStateTimestamp, timeBytes)
if err != nil {
return fmt.Errorf("error writing state timestamp to disk: %w", err)
}
return nil
}
// reportChanges detects and reports any changes to processes on this system since the last call.
func (ms *SysInfoMetricSet) reportChanges(report mb.ReporterV2) error {
processes, err := ms.getProcesses()
if err != nil {
return fmt.Errorf("failed to get processes: %w", err)
}
ms.log.Debugf("Found %v processes", len(processes))
started, stopped := ms.cache.DiffAndUpdateCache(processes)
for _, p := range started {
ms.enrichProcess(p)
if p.Error == nil {
report.Event(ms.processEvent(p, eventTypeEvent, eventActionProcessStarted))
} else {
ms.log.Warn(p.Error)
report.Event(ms.processEvent(p, eventTypeEvent, eventActionProcessError))
}
}
for _, p := range stopped {
if p.Error == nil {
report.Event(ms.processEvent(p, eventTypeEvent, eventActionProcessStopped))
}
}
return nil
}
// enrichProcess enriches a process with user lookup information
// and executable file hash.
func (ms *SysInfoMetricSet) enrichProcess(process *Process) {
if process.UserInfo != nil {
goUser, err := user.LookupId(process.UserInfo.UID)
if err == nil {
process.User = goUser
}
group, err := user.LookupGroupId(process.UserInfo.GID)
if err == nil {
process.Group = group
}
}
if process.Info.Exe != "" {
sharedMntNS, err := isNsSharedWith(process.Info.PID, "mnt")
if err != nil {
if process.Error == nil {
process.Error = fmt.Errorf("failed to get namespaces for %v PID %v: %w", process.Info.Exe, process.Info.PID, err)
}
return
}
if !sharedMntNS {
return
}
hashes, err := ms.hasher.HashFile(process.Info.Exe)
if err != nil {
if process.Error == nil {
process.Error = fmt.Errorf("failed to hash executable %v for PID %v: %w", process.Info.Exe, process.Info.PID, err)
}
return
}
process.Hashes = hashes
}
}
func (ms *SysInfoMetricSet) processEvent(process *Process, eventType string, action eventAction) mb.Event {
event := mb.Event{
RootFields: mapstr.M{
"event": mapstr.M{
"kind": eventType,
"category": []string{"process"},
"type": []string{action.Type()},
"action": action.String(),
},
"process": process.toMapStr(),
"message": processMessage(process, action),
},
}
if process.UserInfo != nil {
putIfNotEmpty(&event.RootFields, "user.id", process.UserInfo.UID)
putIfNotEmpty(&event.RootFields, "user.group.id", process.UserInfo.GID)
putIfNotEmpty(&event.RootFields, "user.effective.id", process.UserInfo.EUID)
putIfNotEmpty(&event.RootFields, "user.effective.group.id", process.UserInfo.EGID)
putIfNotEmpty(&event.RootFields, "user.saved.id", process.UserInfo.SUID)
putIfNotEmpty(&event.RootFields, "user.saved.group.id", process.UserInfo.SGID)
}
if process.User != nil {
if process.User.Username != "" {
event.RootFields.Put("user.name", process.User.Username)
} else if process.User.Name != "" {
event.RootFields.Put("user.name", process.User.Name)
}
}
if process.Group != nil {
event.RootFields.Put("user.group.name", process.Group.Name)
}
if len(process.CapEffective) > 0 {
event.RootFields.Put("process.thread.capabilities.effective", process.CapEffective)
}
if len(process.CapPermitted) > 0 {
event.RootFields.Put("process.thread.capabilities.permitted", process.CapPermitted)
}
if process.Hashes != nil {
for hashType, digest := range process.Hashes {
fieldName := "process.hash." + string(hashType)
event.RootFields.Put(fieldName, digest)
}
}
if process.Error != nil {
event.RootFields.Put("error.message", process.Error.Error())
}
if ms.HostID() != "" {
event.RootFields.Put("process.entity_id",
entityID(ms.HostID(), process.Info.PID, process.Info.StartTime))
}
return event
}
func putIfNotEmpty(mapstr *mapstr.M, key string, value string) {
if value != "" {
mapstr.Put(key, value)
}
}
func processMessage(process *Process, action eventAction) string {
var username string
if process.User != nil {
username = process.User.Username
}
return makeMessage(process.Info.PID, action, process.Info.Name, username, process.Error)
}
func (ms *SysInfoMetricSet) getProcesses() ([]*Process, error) {
sysinfoProcs, err := sysinfo.Processes()
if err != nil {
return nil, fmt.Errorf("failed to fetch processes: %w", err)
}
processes := make([]*Process, 0, len(sysinfoProcs))
for _, sysinfoProc := range sysinfoProcs {
var process *Process
pInfo, err := sysinfoProc.Info()
if err != nil {
if os.IsNotExist(err) {
// Skip - process probably just terminated since our call to Processes().
continue
}
if os.Geteuid() != 0 && os.IsPermission(err) {
// Running as non-root, permission issues when trying to access
// other user's private process information are expected.
if !ms.suppressPermissionWarnings {
ms.log.Warnf("Failed to load process information for PID %d as non-root user. "+
"Will suppress further errors of this kind. Error: %v", sysinfoProc.PID(), err)
// Only warn once at the start of Auditbeat.
ms.suppressPermissionWarnings = true
}
continue
}
// Record what we can and continue
process = &Process{
Info: pInfo,
Error: fmt.Errorf("failed to load process information for PID %d: %w", sysinfoProc.PID(), err),
}
process.Info.PID = sysinfoProc.PID() // in case pInfo did not contain it
} else {
process = &Process{
Info: pInfo,
}
}
userInfo, err := sysinfoProc.User()
if err != nil {
if process.Error == nil {
process.Error = fmt.Errorf("failed to load user for PID %d: %w", sysinfoProc.PID(), err)
}
} else {
process.UserInfo = &userInfo
}
// Exclude Linux kernel processes, they are not very interesting.
if runtime.GOOS == "linux" {
if userInfo.UID == "0" && process.Info.Exe == "" {
continue
}
// Fetch Effective and Permitted capabilities
process.CapEffective, err = capabilities.FromPid(capabilities.Effective, pInfo.PID)
if err != nil && process.Error == nil {
process.Error = err
}
process.CapPermitted, err = capabilities.FromPid(capabilities.Permitted, pInfo.PID)
if err != nil && process.Error == nil {
process.Error = err
}
}
processes = append(processes, process)
}
return processes, nil
}