auditbeat/module/file_integrity/kprobes/monitor.go (171 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//go:build linux
package kprobes
import (
"context"
"errors"
"fmt"
"sync/atomic"
"time"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/go-perf"
)
type MonitorEvent struct {
Path string
PID uint32
Op uint32
}
type monitorEmitter struct {
ctx context.Context
eventC chan<- MonitorEvent
}
func newMonitorEmitter(ctx context.Context, eventC chan MonitorEvent) *monitorEmitter {
return &monitorEmitter{
ctx: ctx,
eventC: eventC,
}
}
func (m *monitorEmitter) Emit(ePath string, pid uint32, op uint32) error {
select {
case <-m.ctx.Done():
return m.ctx.Err()
case m.eventC <- MonitorEvent{
Path: ePath,
PID: pid,
Op: op,
}:
return nil
}
}
type Monitor struct {
eventC chan MonitorEvent
pathMonitor *pTraverser
perfChannel perfChannel
errC chan error
eProc *eProcessor
log *logp.Logger
ctx context.Context
cancelFn context.CancelFunc
running uint32
isRecursive bool
closeErr error
}
func New(isRecursive bool) (*Monitor, error) {
ctx := context.TODO()
validatedProbes, exec, err := getVerifiedProbes(ctx, 5*time.Second)
if err != nil {
return nil, err
}
pChannel, err := newPerfChannel(validatedProbes, 10, 4096, perf.AllThreads)
if err != nil {
return nil, err
}
return newMonitor(ctx, isRecursive, pChannel, exec)
}
func newMonitor(ctx context.Context, isRecursive bool, pChannel perfChannel, exec executor) (*Monitor, error) {
mCtx, cancelFunc := context.WithCancel(ctx)
p, err := newPathMonitor(mCtx, exec, 0, isRecursive)
if err != nil {
cancelFunc()
return nil, err
}
eventChannel := make(chan MonitorEvent, 512)
eProc := newEventProcessor(p, newMonitorEmitter(mCtx, eventChannel), isRecursive)
return &Monitor{
eventC: eventChannel,
pathMonitor: p,
perfChannel: pChannel,
errC: make(chan error, 1),
eProc: eProc,
log: logp.NewLogger("file_integrity"),
ctx: mCtx,
cancelFn: cancelFunc,
isRecursive: isRecursive,
closeErr: nil,
}, nil
}
func (w *Monitor) Add(path string) error {
switch atomic.LoadUint32(&w.running) {
case 0:
return errors.New("monitor not started")
case 2:
return errors.New("monitor is closed")
}
return w.pathMonitor.AddPathToMonitor(w.ctx, path)
}
func (w *Monitor) Close() error {
if !atomic.CompareAndSwapUint32(&w.running, 1, 2) {
switch atomic.LoadUint32(&w.running) {
case 0:
// monitor hasn't started yet
atomic.StoreUint32(&w.running, 2)
default:
return nil
}
}
w.cancelFn()
var allErr error
allErr = errors.Join(allErr, w.pathMonitor.Close())
allErr = errors.Join(allErr, w.perfChannel.Close())
return allErr
}
func (w *Monitor) EventChannel() <-chan MonitorEvent {
return w.eventC
}
func (w *Monitor) ErrorChannel() <-chan error {
return w.errC
}
func (w *Monitor) writeErr(err error) {
select {
case w.errC <- err:
case <-w.ctx.Done():
}
}
func (w *Monitor) Start() error {
if !atomic.CompareAndSwapUint32(&w.running, 0, 1) {
return errors.New("monitor already started")
}
if err := w.perfChannel.Run(); err != nil {
if closeErr := w.Close(); closeErr != nil {
w.log.Warnf("error at closing watcher: %v", closeErr)
}
return err
}
go func() {
defer func() {
close(w.eventC)
if closeErr := w.Close(); closeErr != nil {
w.log.Warnf("error at closing watcher: %v", closeErr)
}
}()
for {
select {
case <-w.ctx.Done():
return
case e, ok := <-w.perfChannel.C():
if !ok {
w.writeErr(fmt.Errorf("read invalid event from perf channel"))
return
}
switch eWithType := e.(type) {
case *ProbeEvent:
if err := w.eProc.process(w.ctx, eWithType); err != nil {
w.writeErr(err)
return
}
continue
default:
w.writeErr(errors.New("unexpected event type"))
return
}
case err := <-w.perfChannel.ErrC():
w.writeErr(err)
return
case lost := <-w.perfChannel.LostC():
w.writeErr(fmt.Errorf("events lost %d", lost))
return
case err := <-w.pathMonitor.ErrC():
w.writeErr(err)
return
}
}
}()
return nil
}