x-pack/auditbeat/module/system/process/quark_provider_linux.go (261 lines of code) (raw):

// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. //go:build linux && (amd64 || arm64) && cgo package process import ( "fmt" "os/user" "strconv" "time" "github.com/elastic/beats/v7/auditbeat/helper/hasher" "github.com/elastic/beats/v7/auditbeat/helper/tty" "github.com/elastic/beats/v7/libbeat/common/capabilities" "github.com/elastic/beats/v7/metricbeat/mb" "github.com/elastic/elastic-agent-libs/mapstr" "github.com/elastic/elastic-agent-libs/monitoring" quark "github.com/elastic/go-quark" ) var quarkMetrics = struct { insertions *monitoring.Uint removals *monitoring.Uint aggregations *monitoring.Uint nonAggregations *monitoring.Uint lost *monitoring.Uint backend *monitoring.String }{} func init() { reg := monitoring.Default.NewRegistry("process@quark") quarkMetrics.insertions = monitoring.NewUint(reg, "insertions") quarkMetrics.removals = monitoring.NewUint(reg, "removals") quarkMetrics.aggregations = monitoring.NewUint(reg, "aggregations") quarkMetrics.nonAggregations = monitoring.NewUint(reg, "non_aggregations") quarkMetrics.lost = monitoring.NewUint(reg, "lost") quarkMetrics.backend = monitoring.NewString(reg, "backend", monitoring.Report) } // QuarkMetricSet is a MetricSet with added members used only in and by // quark. QuarkMetricSet uses mb.PushReporterV2 instead of // mb.ReporterV2. More notably we don't do periodic state reports and // we don't need a cache as it is provided by quark. type QuarkMetricSet struct { MetricSet queue *quark.Queue // Quark runtime state selfMntNsIno uint32 // Mnt inode from current process cachedHasher *hasher.CachedHasher } // Used for testing only and not exposed via config var quarkForceKprobe bool // NewFromQuark instantiates the module with quark's backend. func NewFromQuark(ms MetricSet) (mb.MetricSet, error) { var qm QuarkMetricSet qm.MetricSet = ms ino64, err := selfNsIno("mnt") if err != nil { return nil, fmt.Errorf("failed to fetch self mount inode: %w", err) } qm.selfMntNsIno = uint32(ino64) qm.cachedHasher, err = hasher.NewFileHasherWithCache(qm.config.HasherConfig, 4096) if err != nil { return nil, fmt.Errorf("can't create hash cache: %w", err) } attr := quark.DefaultQueueAttr() if quarkForceKprobe { attr.Flags &= ^quark.QQ_ALL_BACKENDS attr.Flags |= quark.QQ_KPROBE } qm.queue, err = quark.OpenQueue(attr, 1) if err != nil { qm.cachedHasher.Close() return nil, fmt.Errorf("can't open quark queue: %w", err) } stats := qm.queue.Stats() if stats.Backend == quark.QQ_EBPF { qm.log.Info("quark using EBPF") } else if stats.Backend == quark.QQ_KPROBE { qm.log.Info("quark using KPROBES") } else { qm.queue.Close() qm.cachedHasher.Close() return nil, fmt.Errorf("quark has an invalid backend") } return &qm, nil } // Run reads events from quark's queue and pushes them into output. // The queue is owned by this goroutine and should not be touched // from outside as there is no synchronization. func (ms *QuarkMetricSet) Run(r mb.PushReporterV2) { ms.log.Info("Quark running") metricsStamp := time.Now() MainLoop: for { // Poll for done select { case <-r.Done(): break MainLoop default: } ms.maybeUpdateMetrics(&metricsStamp) x := time.Now() quarkEvents, err := ms.queue.GetEvents() if len(quarkEvents) == 1 { ms.log.Debugf("getevents took %v", time.Since(x)) } if err != nil { ms.log.Error("quark GetEvents, unrecoverable error", err) break MainLoop } if len(quarkEvents) == 0 { err = ms.queue.Block() if err != nil { ms.log.Error("quark Block, unrecoverable error", err) break MainLoop } continue } for _, quarkEvent := range quarkEvents { if !wantedEvent(quarkEvent) { continue } if event, ok := ms.toEvent(quarkEvent); ok { r.Event(event) } } } // Queue is owned by this goroutine, if we ever access it from // outside, we need to consider synchronization. ms.cachedHasher.Close() ms.queue.Close() ms.queue = nil } // toEvent converts a quark.Event to a mb.Event, returns true if we // were able to make an event. func (ms *QuarkMetricSet) toEvent(quarkEvent quark.Event) (mb.Event, bool) { action, evtype := actionAndTypeOfEvent(quarkEvent) process := quarkEvent.Process event := mb.Event{RootFields: mapstr.M{}} var username string var processErr error defer func() { // Fill out root message and error.message event.RootFields.Put("message", makeMessage(int(process.Pid), action, process.Comm, username, processErr)) if processErr != nil { event.RootFields.Put("error.message", processErr.Error()) } }() // Values that are independent of Proc.Valid // Fill out event.* event.RootFields.Put("event.type", evtype) event.RootFields.Put("event.action", action.String()) event.RootFields.Put("event.category", []string{"process"}) event.RootFields.Put("event.kind", "event") // Fill out process.* event.RootFields.Put("process.name", process.Comm) event.RootFields.Put("process.args", process.Cmdline) event.RootFields.Put("process.args_count", len(process.Cmdline)) event.RootFields.Put("process.pid", process.Pid) event.RootFields.Put("process.working_directory", process.Cwd) event.RootFields.Put("process.executable", process.Filename) if process.Exit.Valid { event.RootFields.Put("process.exit_code", process.Exit.ExitCode) } if !process.Proc.Valid { return event, true } // // Code below can rely on Proc // // Ids event.RootFields.Put("process.parent.pid", process.Proc.Ppid) startTime := time.Unix(0, int64(process.Proc.TimeBoot)) if ms.HostID() != "" { // TODO unify with sessionview and guarantee loss of precision event.RootFields.Put("process.entity_id", entityID(ms.HostID(), int(process.Pid), startTime)) } event.RootFields.Put("process.start", startTime) event.RootFields.Put("user.id", process.Proc.Uid) event.RootFields.Put("user.group.id", process.Proc.Gid) event.RootFields.Put("user.effective.id", process.Proc.Euid) event.RootFields.Put("user.effective.group.id", process.Proc.Egid) event.RootFields.Put("user.saved.id", process.Proc.Suid) event.RootFields.Put("user.saved.group.id", process.Proc.Sgid) if us, err := user.LookupId(strconv.FormatUint(uint64(process.Proc.Uid), 10)); err == nil { event.RootFields.Put("user.name", us.Username) username = us.Username } if group, err := user.LookupGroupId(strconv.FormatUint(uint64(process.Proc.Gid), 10)); err == nil { event.RootFields.Put("user.group.name", group.Name) } // Tty things event.RootFields.Put("process.interactive", tty.InteractiveFromTTY(tty.TTYDev{ Major: process.Proc.TtyMajor, Minor: process.Proc.TtyMinor, })) if process.Proc.TtyMajor != 0 { event.RootFields.Put("process.tty.char_device.major", process.Proc.TtyMajor) event.RootFields.Put("process.tty.char_device.minor", process.Proc.TtyMinor) } // Capabilities capEffective, _ := capabilities.FromUint64(process.Proc.CapEffective) if len(capEffective) > 0 { event.RootFields.Put("process.thread.capabilities.effective", capEffective) } capPermitted, _ := capabilities.FromUint64(process.Proc.CapPermitted) if len(capPermitted) > 0 { event.RootFields.Put("process.thread.capabilities.permitted", capPermitted) } // If we are in the same mount namespace of the process, hash // the file. When quark is running on kprobes, there are // limitations concerning the full path of the filename, in // those cases, the path won't start with a slash. if process.Proc.MntInonum == ms.selfMntNsIno && len(process.Filename) > 0 && process.Filename[0] == '/' { hashes, err := ms.cachedHasher.HashFile(process.Filename) if err != nil { processErr = fmt.Errorf("failed to hash executable %v for PID %v: %w", process.Filename, process.Pid, err) ms.log.Warn(processErr.Error()) } else { for hashType, digest := range hashes { fieldName := "process.hash." + string(hashType) event.RootFields.Put(fieldName, digest) } } } else { ms.log.Debugf("skipping hash %s (inonum %d vs %d)", process.Filename, process.Proc.MntInonum, ms.selfMntNsIno) } return event, true } // wantedEvent filters in only the wanted events from quark. func wantedEvent(quarkEvent quark.Event) bool { const wanted uint64 = quark.QUARK_EV_FORK | quark.QUARK_EV_EXEC | quark.QUARK_EV_EXIT | quark.QUARK_EV_SNAPSHOT if quarkEvent.Events&wanted == 0 || quarkEvent.Process.Pid == 2 || quarkEvent.Process.Proc.Ppid == 2 { // skip kthreads return false } return true } // actionAndTypeOfEvent computes eventAction and event.type out of a quark.Event. func actionAndTypeOfEvent(quarkEvent quark.Event) (eventAction, []string) { snap := quarkEvent.Events&quark.QUARK_EV_SNAPSHOT != 0 fork := quarkEvent.Events&quark.QUARK_EV_FORK != 0 exec := quarkEvent.Events&quark.QUARK_EV_EXEC != 0 exit := quarkEvent.Events&quark.QUARK_EV_EXIT != 0 // Calculate event.action // If it's a snap, it's existing // If it forked + exited and executed or not, we consider ran // If it execed + exited we consider stopped // If it execed but didn't fork or exit, we consider changed image var action eventAction if snap { action = eventActionExistingProcess } else if fork && exit { action = eventActionProcessRan } else if fork { action = eventActionProcessStarted } else if exit { action = eventActionProcessStopped } else if exec { action = eventActionProcessChangedImage } else { action = eventActionProcessError } // Calculate event.type evtype := make([]string, 0, 4) if snap { evtype = append(evtype, eventActionExistingProcess.Type()) } if fork { evtype = append(evtype, eventActionProcessStarted.Type()) } if exec { evtype = append(evtype, eventActionProcessChangedImage.Type()) } if exit { evtype = append(evtype, eventActionProcessStopped.Type()) } return action, evtype } func (ms *QuarkMetricSet) maybeUpdateMetrics(stamp *time.Time) { if time.Since(*stamp) < time.Second*5 { return } stats := ms.queue.Stats() quarkMetrics.insertions.Set(stats.Insertions) quarkMetrics.removals.Set(stats.Removals) quarkMetrics.aggregations.Set(stats.Aggregations) quarkMetrics.nonAggregations.Set(stats.NonAggregations) quarkMetrics.lost.Set(stats.Lost) if stats.Backend == quark.QQ_EBPF { quarkMetrics.backend.Set("ebpf") } else if stats.Backend == quark.QQ_KPROBE { quarkMetrics.backend.Set("kprobe") } else { quarkMetrics.backend.Set("invalid") } *stamp = time.Now() }