auditbeat/module/file_integrity/eventreader_kprobes.go (156 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. //go:build linux package file_integrity import ( "errors" "fmt" "path/filepath" "time" "github.com/elastic/beats/v7/auditbeat/module/file_integrity/kprobes" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/processors/add_process_metadata" "golang.org/x/sys/unix" "github.com/elastic/elastic-agent-libs/logp" ) type kProbesReader struct { watcher *kprobes.Monitor config Config eventC chan Event log *logp.Logger parsers []FileParser processor beat.Processor } func newKProbesReader(config Config, l *logp.Logger, parsers []FileParser) (*kProbesReader, error) { processor, err := add_process_metadata.NewWithConfig( add_process_metadata.ConfigOverwriteKeys(true), add_process_metadata.ConfigMatchPIDs([]string{"process.pid"}), ) if err != nil { return nil, err } return &kProbesReader{ config: config, eventC: make(chan Event), log: l, parsers: parsers, processor: processor, }, nil } func (r kProbesReader) Processor() beat.Processor { return r.processor } func (r kProbesReader) Start(done <-chan struct{}) (<-chan Event, error) { watcher, err := kprobes.New(r.config.Recursive) if err != nil { return nil, err } r.watcher = watcher if err := r.watcher.Start(); err != nil { // Ensure that watcher is closed so that we don't leak watchers r.watcher.Close() return nil, fmt.Errorf("unable to start watcher: %w", err) } queueDone := make(chan struct{}) queueC := make(chan []*Event) // Launch a separate goroutine to fetch all events that happen while // watches are being installed. go func() { defer close(queueC) queueC <- r.enqueueEvents(queueDone) }() // kProbes watcher needs to have the watched paths // installed after the event consumer is started, to avoid a potential // deadlock. Do it on all platforms for simplicity. for _, p := range r.config.Paths { if err := r.watcher.Add(p); err != nil { if errors.Is(err, unix.EMFILE) { r.log.Warnw("Failed to add watch (check the max number of "+ "open files allowed with 'ulimit -a')", "file_path", p, "error", err) } else { r.log.Warnw("Failed to add watch", "file_path", p, "error", err) } } } close(queueDone) events := <-queueC // Populate callee's event channel with the previously received events r.eventC = make(chan Event, 1+len(events)) for _, ev := range events { r.eventC <- *ev } go r.consumeEvents(done) r.log.Infow("Started kprobes watcher", "file_path", r.config.Paths, "recursive", r.config.Recursive) return r.eventC, nil } func (r kProbesReader) enqueueEvents(done <-chan struct{}) []*Event { var events []*Event //nolint:prealloc //can't be preallocated as the number of events is unknown for { ev := r.nextEvent(done) if ev == nil { break } events = append(events, ev) } return events } func (r kProbesReader) consumeEvents(done <-chan struct{}) { defer close(r.eventC) defer r.watcher.Close() for { ev := r.nextEvent(done) if ev == nil { r.log.Debug("kprobes reader terminated") return } r.eventC <- *ev } } func (r kProbesReader) nextEvent(done <-chan struct{}) *Event { for { select { case <-done: return nil case event := <-r.watcher.EventChannel(): if event.Path == "" || r.config.IsExcludedPath(event.Path) || !r.config.IsIncludedPath(event.Path) { continue } r.log.Debugw("Received kprobes event", "file_path", event.Path, "event_flags", event.Op) abs, err := filepath.Abs(event.Path) if err != nil { r.log.Errorw("Failed to obtain absolute path", "file_path", event.Path, "error", err, ) event.Path = filepath.Clean(event.Path) } else { event.Path = abs } start := time.Now() e := NewEvent(event.Path, kProbeTypeToAction(event.Op), SourceKProbes, r.config.MaxFileSizeBytes, r.config.HashTypes, r.parsers) if e.Process == nil { e.Process = &Process{} } e.Process.PID = event.PID e.rtt = time.Since(start) return &e case err := <-r.watcher.ErrorChannel(): if err != nil { r.log.Errorw("kprobes watcher error", "error", err) } } } } func kProbeTypeToAction(op uint32) Action { switch op { case unix.IN_CREATE, unix.IN_MOVED_TO: return Created case unix.IN_MODIFY: return Updated case unix.IN_DELETE: return Deleted case unix.IN_MOVED_FROM: return Moved case unix.IN_ATTRIB: return AttributesModified default: return 0 } }