x-pack/auditbeat/module/system/package/package.go (610 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.
//go:build !windows
package pkg
import (
"bufio"
"bytes"
"encoding/binary"
"encoding/gob"
"errors"
"fmt"
"io"
"io/fs"
"os"
"path/filepath"
"runtime"
"strconv"
"strings"
"syscall"
"time"
"github.com/cespare/xxhash/v2"
"github.com/gofrs/uuid/v5"
"go.etcd.io/bbolt"
"github.com/elastic/beats/v7/auditbeat/ab"
"github.com/elastic/beats/v7/auditbeat/datastore"
"github.com/elastic/beats/v7/metricbeat/mb"
"github.com/elastic/beats/v7/x-pack/auditbeat/cache"
"github.com/elastic/beats/v7/x-pack/auditbeat/module/system"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
)
const (
metricsetName = "package"
namespace = "system.audit.package"
bucketNameV2 = "package.v2"
bucketKeyPackages = "packages"
bucketKeyStateTimestamp = "state_timestamp"
eventTypeState = "state"
eventTypeEvent = "event"
)
var (
rpmPath = "/var/lib/rpm"
dpkgPath = "/var/lib/dpkg"
homebrewCellarPath = []string{"/usr/local/Cellar", "/opt/homebrew/Cellar"}
)
type eventAction uint8
const (
eventActionExistingPackage eventAction = iota
eventActionPackageInstalled
eventActionPackageRemoved
eventActionPackageUpdated
)
func (action eventAction) String() string {
switch action {
case eventActionExistingPackage:
return "existing_package"
case eventActionPackageInstalled:
return "package_installed"
case eventActionPackageRemoved:
return "package_removed"
case eventActionPackageUpdated:
return "package_updated"
default:
return ""
}
}
func (action eventAction) Type() string {
switch action {
case eventActionExistingPackage:
return "info"
case eventActionPackageInstalled:
return "installation"
case eventActionPackageRemoved:
return "deletion"
case eventActionPackageUpdated:
return "change"
default:
return "info"
}
}
func init() {
ab.Registry.MustAddMetricSet(system.ModuleName, metricsetName, New,
mb.DefaultMetricSet(),
mb.WithNamespace(namespace),
)
}
// MetricSet collects data about the system's packages.
type MetricSet struct {
system.SystemMetricSet
config config
log *logp.Logger
cache *cache.Cache[*Package]
bucket datastore.Bucket
lastState time.Time
suppressNoPackageWarnings bool
}
// Package represents information for a package.
type Package struct {
Name string
Version string
Release string
Arch string
License string
InstallTime time.Time
Size uint64
Summary string
URL string
Type string
error error
}
// Hash creates a hash for Package.
//
//nolint:errcheck // Writing to the hash never returns an error.
func (pkg Package) Hash() uint64 {
h := xxhash.New()
h.WriteString(pkg.Name)
h.WriteString(pkg.Version)
h.WriteString(pkg.Release)
binary.Write(h, binary.LittleEndian, pkg.Size)
return h.Sum64()
}
func (pkg Package) toMapStr() (mapstr.M, mapstr.M) {
nonECS := mapstr.M{
"name": pkg.Name,
"version": pkg.Version,
}
ecs := mapstr.M{
"name": pkg.Name,
"version": pkg.Version,
}
if pkg.Release != "" {
nonECS["release"] = pkg.Release
}
if pkg.Arch != "" {
nonECS["arch"] = pkg.Arch
ecs["architecture"] = pkg.License
}
if pkg.License != "" {
nonECS["license"] = pkg.License
ecs["license"] = pkg.License
}
if !pkg.InstallTime.IsZero() {
nonECS["installtime"] = pkg.InstallTime
ecs["installed"] = pkg.InstallTime
}
if pkg.Size != 0 {
nonECS["size"] = pkg.Size
ecs["size"] = pkg.Size
}
if pkg.Summary != "" {
nonECS["summary"] = pkg.Summary
ecs["description"] = pkg.Summary
}
if pkg.URL != "" {
nonECS["url"] = pkg.URL
ecs["reference"] = pkg.URL
}
if pkg.Type != "" {
ecs["type"] = pkg.Type
}
return nonECS, ecs
}
// entityID creates an ID that uniquely identifies this package across machines.
func (pkg Package) entityID(hostID string) string {
h := system.NewEntityHash()
h.Write([]byte(hostID))
h.Write([]byte(pkg.Name))
h.Write([]byte(pkg.Version))
return h.Sum()
}
// New constructs a new MetricSet.
func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
config := defaultConfig()
if err := base.Module().UnpackConfig(&config); err != nil {
return nil, fmt.Errorf("failed to unpack the %v/%v config: %w", system.ModuleName, metricsetName, err)
}
if err := datastore.Update(migrateDatastoreSchema); err != nil {
return nil, fmt.Errorf("datastore schema migration failed: %w", err)
}
bucket, err := datastore.OpenBucket(bucketNameV2)
if err != nil {
return nil, fmt.Errorf("failed to open persistent datastore: %w", err)
}
ms := &MetricSet{
SystemMetricSet: system.NewSystemMetricSet(base),
config: config,
log: logp.NewLogger(metricsetName),
cache: cache.New[*Package](),
bucket: bucket,
}
ms.lastState, err = loadStateTimestamp(bucket)
if err != nil {
return nil, fmt.Errorf("failed to load state timestamp from bucket %v: %w", bucketNameV2, err)
}
if !ms.lastState.IsZero() {
ms.log.Debugf("Last state was sent at %v. Next state update by %v.", ms.lastState, ms.lastState.Add(ms.config.effectiveStatePeriod()))
} else {
ms.log.Debug("No state timestamp found.")
}
if config.PackageSuidDrop != nil {
if os.Getuid() != 0 && int(*ms.config.PackageSuidDrop) != os.Getuid() {
return nil, fmt.Errorf("package.rpm_drop_to_suid is set to %d, but we're running as a different non-root user", *config.PackageSuidDrop)
}
ms.log.Debugf("Dropping to EUID %d from UID %d for RPM API calls", *ms.config.PackageSuidDrop, os.Getuid())
}
packages, err := loadPackages(ms.bucket)
if err != nil {
return nil, fmt.Errorf("failed to load persisted package metadata from disk: %w", err)
}
ms.log.Debugf("Loaded %d packages from disk", len(packages))
ms.cache.DiffAndUpdateCache(packages)
return ms, nil
}
// Close cleans up the MetricSet when it finishes.
func (ms *MetricSet) Close() error {
var errs []error
errs = append(errs, closeDataset())
if ms.bucket != nil {
errs = append(errs, ms.bucket.Close())
}
return errors.Join(errs...)
}
// Fetch collects data about the host. It is invoked periodically.
func (ms *MetricSet) Fetch(report mb.ReporterV2) {
needsStateUpdate := time.Since(ms.lastState) > ms.config.effectiveStatePeriod()
if needsStateUpdate {
ms.log.Debug("Sending state")
err := ms.reportState(report)
if err != nil {
ms.log.Error(err)
report.Error(err)
}
ms.log.Debugf("Next state update by %v", ms.lastState.Add(ms.config.effectiveStatePeriod()))
}
err := ms.reportChanges(report)
if err != nil {
ms.log.Error(err)
report.Error(err)
}
}
// reportState reports all installed packages on the system.
func (ms *MetricSet) reportState(report mb.ReporterV2) error {
ms.lastState = time.Now()
packages, err := ms.getPackages()
if err != nil {
return fmt.Errorf("failed to get packages: %w", err)
}
stateID, err := uuid.NewV4()
if err != nil {
return fmt.Errorf("error generating state ID: %w", err)
}
for _, pkg := range packages {
event := ms.packageEvent(pkg, eventTypeState, eventActionExistingPackage)
_, _ = event.RootFields.Put("event.id", stateID.String())
report.Event(event)
}
// This will initialize the cache with the current packages
ms.cache.DiffAndUpdateCache(packages)
if err = storeStateTimestamp(ms.bucket, ms.lastState); err != nil {
return fmt.Errorf("error persisting state timestamp: %w", err)
}
return storePackages(ms.bucket, packages)
}
// reportChanges detects and reports any changes to installed packages on this system since the last call.
func (ms *MetricSet) reportChanges(report mb.ReporterV2) error {
packages, err := ms.getPackages()
if err != nil {
return fmt.Errorf("failed to get packages: %w", err)
}
newPackages, missingPackages := ms.cache.DiffAndUpdateCache(packages)
// Package names of updated packages
updated := make(map[string]struct{})
for _, missingPkg := range missingPackages {
found := false
// Using an inner loop is less efficient than using a map, but in this case
// we do not expect a lot of installed or removed packages all at once.
for _, newPkg := range newPackages {
if missingPkg.Name == newPkg.Name {
found = true
updated[newPkg.Name] = struct{}{}
report.Event(ms.packageEvent(newPkg, eventTypeEvent, eventActionPackageUpdated))
break
}
}
if !found {
report.Event(ms.packageEvent(missingPkg, eventTypeEvent, eventActionPackageRemoved))
}
}
for _, newPkg := range newPackages {
if _, contains := updated[newPkg.Name]; !contains {
report.Event(ms.packageEvent(newPkg, eventTypeEvent, eventActionPackageInstalled))
}
}
if len(newPackages) > 0 || len(missingPackages) > 0 {
return storePackages(ms.bucket, packages)
}
return nil
}
func (ms *MetricSet) packageEvent(pkg *Package, eventType string, action eventAction) mb.Event {
pkgFields, ecsPkgFields := pkg.toMapStr()
event := mb.Event{
RootFields: mapstr.M{
"event": mapstr.M{
"kind": eventType,
"category": []string{"package"},
"type": []string{action.Type()},
"action": action.String(),
},
"package": ecsPkgFields,
"message": packageMessage(pkg, action),
},
MetricSetFields: pkgFields,
}
if ms.HostID() != "" {
event.MetricSetFields["entity_id"] = pkg.entityID(ms.HostID())
}
if pkg.error != nil {
event.RootFields["error"] = mapstr.M{
"message": pkg.error.Error(),
}
}
return event
}
func packageMessage(pkg *Package, action eventAction) string {
var actionString string
switch action {
case eventActionExistingPackage:
actionString = "is already installed"
case eventActionPackageInstalled:
actionString = "installed"
case eventActionPackageRemoved:
actionString = "removed"
case eventActionPackageUpdated:
actionString = "updated"
}
return fmt.Sprintf("Package %v (%v) %v",
pkg.Name, pkg.Version, actionString)
}
// loadStateTimestamp loads state timestamp from a bucket. This is the time
// when all package state was last emitted as events.
func loadStateTimestamp(bucket datastore.Bucket) (time.Time, error) {
var stateTimestamp time.Time
err := bucket.Load(bucketKeyStateTimestamp, func(blob []byte) error {
if len(blob) > 0 {
return stateTimestamp.UnmarshalBinary(blob)
}
return nil
})
if err != nil {
return time.Time{}, err
}
return stateTimestamp, nil
}
// storeStateTimestamp stores the timestamp of the last state update to
// the given datastore bucket.
func storeStateTimestamp(bucket datastore.Bucket, t time.Time) error {
data, err := t.MarshalBinary()
if err != nil {
return err
}
if err = bucket.Store(bucketKeyStateTimestamp, data); err != nil {
return fmt.Errorf("error writing state timestamp to disk: %w", err)
}
return nil
}
// loadPackages loads the persisted packages from the given datastore bucket.
func loadPackages(bucket datastore.Bucket) (packages []*Package, err error) {
var data []byte
err = bucket.Load(bucketKeyPackages, func(blob []byte) error {
data = blob
return nil
})
if err != nil {
return nil, err
}
if len(data) > 0 {
packages, err = decodePackagesFromContainer(data)
if err != nil {
return nil, err
}
}
return packages, nil
}
// storePackages stores packages to the given datastore bucket.
func storePackages(bucket datastore.Bucket, packages []*Package) error {
builder, release := fbGetBuilder()
defer release()
if err := bucket.Store(bucketKeyPackages, encodePackages(builder, packages)); err != nil {
return fmt.Errorf("error persisting packages to datastore: %w", err)
}
return nil
}
func (ms *MetricSet) getPackages() ([]*Package, error) {
packages := []*Package{}
var foundPackageManager bool
_, statErr := os.Stat(rpmPath)
if statErr == nil {
foundPackageManager = true
if ms.config.PackageSuidDrop != nil {
// This is rather ugly.
// Basically, older RPM setups will use BDB as a database for the RPM state, and
// BDB is incredibly easy to corrupt and does not handle parallel operations well.
// see https://github.com/rpm-software-management/rpm/issues/232
// The easiest way around this is to drop perms to non-root, so librpm can't write to any of the DB files.
// this means we can't corrupt anything, and it also means that BDB won't perform any of the failchk()
// operations that exibit some parallel access issues
// HOWEVER this is technically non-POSIX-compliant, as posix expects all threads in the process to
// have identical perms.
// lock our setreuid to one thread
runtime.LockOSThread()
doUnlock := true
defer func() {
// if for some reason the second setreuid call fails, we don't
// want to release the OS thread, as we'll have a non-root thread floating around that
// the go scheduler could assign to something that expects root permissions.
if doUnlock {
runtime.UnlockOSThread()
} else {
ms.log.Debugf("setreuid has failed; package query thread will remain locked")
}
}()
minus1 := -1
currentUID := os.Getuid()
_, _, serr := syscall.Syscall(syscall.SYS_SETREUID, uintptr(minus1), uintptr(*ms.config.PackageSuidDrop), uintptr(minus1))
if serr != 0 {
return nil, fmt.Errorf("got error from setreuid trying to drop out of root: %w", serr)
}
rpmPackages, err := listRPMPackages(true)
if err != nil {
return nil, fmt.Errorf("got error listing RPM packages: %w", err)
}
_, _, serr = syscall.Syscall(syscall.SYS_SETREUID, uintptr(minus1), uintptr(currentUID), uintptr(minus1))
if serr != 0 {
doUnlock = false
return nil, fmt.Errorf("got error from setreuid trying to reset euid: %w", serr)
}
packages = append(packages, rpmPackages...)
} else {
rpmPackages, err := listRPMPackages(false)
if err != nil {
return nil, fmt.Errorf("error listing RPM packages: %w", err)
}
packages = append(packages, rpmPackages...)
}
} else if !os.IsNotExist(statErr) {
return nil, fmt.Errorf("error opening %v: %w", rpmPath, statErr)
}
_, statErr = os.Stat(dpkgPath)
if statErr == nil {
foundPackageManager = true
dpkgPackages, err := ms.listDebPackages()
if err != nil {
return nil, fmt.Errorf("error getting DEB packages: %w", err)
}
ms.log.Debugf("DEB packages: %v", len(dpkgPackages))
packages = append(packages, dpkgPackages...)
} else if !os.IsNotExist(statErr) {
return nil, fmt.Errorf("error opening %v: %w", dpkgPath, statErr)
}
for _, path := range homebrewCellarPath {
_, statErr = os.Stat(path)
if statErr != nil {
if errors.Is(statErr, fs.ErrNotExist) {
continue
}
return nil, fmt.Errorf("error opening %v: %w", path, statErr)
}
foundPackageManager = true
homebrewPackages, err := listBrewPackages(path)
if err != nil {
return nil, fmt.Errorf("error getting Homebrew packages: %w", err)
}
ms.log.Debugf("Homebrew packages: %v", len(homebrewPackages))
packages = append(packages, homebrewPackages...)
break
}
if !foundPackageManager && !ms.suppressNoPackageWarnings {
ms.log.Warnf("No supported package managers found. None of %v, %v, %v exist.",
rpmPath, dpkgPath, strings.Join(homebrewCellarPath, ","))
// Only warn once at the start of Auditbeat.
ms.suppressNoPackageWarnings = true
}
return packages, nil
}
func (ms *MetricSet) listDebPackages() ([]*Package, error) {
dpkgStatusFile := filepath.Join(dpkgPath, "status")
file, err := os.Open(dpkgStatusFile)
if err != nil {
return nil, fmt.Errorf("error opening %s: %w", dpkgStatusFile, err)
}
defer file.Close()
var packages []*Package
var skipPackage bool
var pkg *Package
scanner := bufio.NewScanner(file)
for scanner.Scan() {
line := scanner.Text()
if len(strings.TrimSpace(line)) == 0 {
// empty line signals new package
if !skipPackage {
packages = append(packages, pkg)
}
skipPackage = false
pkg = nil
continue
} else if skipPackage {
// Skipping this package - read on.
continue
}
if strings.HasPrefix(line, " ") {
// not interested in multi-lines for now
continue
}
words := strings.SplitN(line, ":", 2)
if len(words) != 2 {
return nil, fmt.Errorf("the following line was unexpected (no ':' found): '%s'", line)
}
value := strings.TrimSpace(words[1])
if pkg == nil {
pkg = &Package{
Type: "dpkg",
}
}
switch strings.ToLower(words[0]) {
case "package":
pkg.Name = value
case "status":
if strings.HasPrefix(value, "deinstall ok") {
// Package was removed but not purged. We report both cases as removed.
skipPackage = true
}
case "architecture":
pkg.Arch = value
case "version":
pkg.Version = value
case "description":
pkg.Summary = value
case "installed-size":
pkg.Size, err = parseDpkgInstalledSize(value)
if err != nil {
// If installed size is invalid, log a warning but still
// report the package with size=0.
ms.log.Warnw("Failed parsing installed size",
"package", pkg.Name,
"Installed-Size", value,
"Error", err)
}
case "homepage":
pkg.URL = value
default:
continue
}
}
if err = scanner.Err(); err != nil {
return nil, fmt.Errorf("error scanning file %v: %w", dpkgStatusFile, err)
}
// Append last package if file ends without newline
if pkg != nil && !skipPackage {
packages = append(packages, pkg)
}
return packages, nil
}
func parseDpkgInstalledSize(value string) (size uint64, err error) {
// Installed-Size is an integer (KiB).
if size, err = strconv.ParseUint(value, 10, 64); err == nil {
return size, nil
}
// Some rare third-party packages contain a unit at the end. This is ignored
// by dpkg tools. Try to parse to return a value as close as possible
// to what the package maintainer meant.
end := len(value)
for idx, chr := range value {
if chr < '0' || chr > '9' {
end = idx
break
}
}
multiplier := uint64(1)
if end < len(value) {
switch value[end] {
case 'm', 'M':
multiplier = 1024
case 'g', 'G':
multiplier = 1024 * 1024
}
}
size, err = strconv.ParseUint(value[:end], 10, 64)
return size * multiplier, err
}
// packageV1 is the struct used in packages.v1.
// Do not modify this struct because this must remain the same as what
// was used in earlier Auditbeat releases.
type packageV1 struct {
Name string
Version string
Release string
Arch string
License string
InstallTime time.Time
Size uint64
Summary string
URL string
Type string
//nolint:unused // This field is unused, but we are keeping this struct as is.
error error
}
// migrateDatastoreSchema migrates the contents of the data store to the latest
// schema. This allows users of earlier versions of Auditbeat to upgrade to
// new versions while maintaining existing state. This handles migrating data
// from the package.v1 bucket into package.v2.
//
// It performs the migration entirely within the given write transaction such
// that if any problems occur the changes are rolled back.
func migrateDatastoreSchema(tx *bbolt.Tx) error {
const bucketNameV1 = "package.v1"
v2Bucket := tx.Bucket([]byte(bucketNameV2))
if v2Bucket != nil {
// Already exists. No need to migrate.
return nil
}
v1Bucket := tx.Bucket([]byte(bucketNameV1))
if v1Bucket == nil {
// No old data to migrate.
return nil
}
log := logp.NewLogger(metricsetName)
log.Debugf("Migrating data from %v to %v bucket.", bucketNameV1, bucketNameV2)
var packages []*Package
if data := v1Bucket.Get([]byte(bucketKeyPackages)); len(data) > 0 {
dec := gob.NewDecoder(bytes.NewReader(data))
for {
var pkgV1 packageV1
if err := dec.Decode(&pkgV1); err != nil {
if errors.Is(err, io.EOF) {
break
}
return fmt.Errorf("error migrating %v data: failed decoding packages: %w", bucketNameV1, err)
}
packages = append(packages, &Package{
Name: pkgV1.Name,
Version: pkgV1.Version,
Release: pkgV1.Release,
Arch: pkgV1.Arch,
License: pkgV1.License,
InstallTime: pkgV1.InstallTime,
Size: pkgV1.Size,
Summary: pkgV1.Summary,
URL: pkgV1.URL,
Type: pkgV1.Type,
})
}
}
v2Bucket, err := tx.CreateBucketIfNotExists([]byte(bucketNameV2))
if err != nil {
return fmt.Errorf("error migrating data: failed to create %v bucket: %w", bucketNameV2, err)
}
// Copy the gob encoded state timestamp from the v1 bucket to the v2 bucket.
if timestampGob := v1Bucket.Get([]byte(bucketKeyStateTimestamp)); timestampGob != nil {
if err = v2Bucket.Put([]byte(bucketKeyStateTimestamp), timestampGob); err != nil {
return fmt.Errorf("error migrating data: failed to write %v to %v bucket: %w", bucketKeyStateTimestamp, bucketNameV2, err)
}
}
builder, release := fbGetBuilder()
defer release()
if err = v2Bucket.Put([]byte(bucketKeyPackages), encodePackages(builder, packages)); err != nil {
return fmt.Errorf("error migrating data: failed to write %v to %v bucket: %w", bucketKeyPackages, bucketNameV2, err)
}
if err = tx.DeleteBucket([]byte(bucketNameV1)); err != nil {
return fmt.Errorf("error migrating data: failed to delete %v bucket: %w", bucketNameV1, err)
}
log.Debugf("Completed migrating data from %v to %v bucket. Moved %d packages.", bucketNameV1, bucketNameV2, len(packages))
return nil
}