auditbeat/module/file_integrity/kprobes/verifier.go (160 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. //go:build linux package kprobes import ( "bytes" "context" "embed" "errors" "io/fs" "os" "strings" "time" "github.com/elastic/beats/v7/auditbeat/tracing" tkbtf "github.com/elastic/tk-btf" ) //go:embed embed var embedBTFFolder embed.FS func getVerifiedProbes(ctx context.Context, timeout time.Duration) (map[tracing.Probe]tracing.AllocateFn, executor, error) { fExec := newFixedThreadExecutor(ctx) probeMgr, err := newProbeManager(fExec) if err != nil { return nil, nil, err } specs, err := loadAllSpecs() if err != nil { return nil, nil, err } var allErr error for len(specs) > 0 { s := specs[0] if !probeMgr.shouldBuild(s) { specs = specs[1:] continue } probes, err := probeMgr.build(s) if err != nil { allErr = errors.Join(allErr, err) specs = specs[1:] continue } if err := verify(ctx, fExec, probes, timeout); err != nil { if probeMgr.onErr(err) { continue } allErr = errors.Join(allErr, err) specs = specs[1:] continue } return probes, fExec, nil } fExec.Close() return nil, nil, errors.Join(allErr, errors.New("could not validate probes")) } func loadAllSpecs() ([]*tkbtf.Spec, error) { var specs []*tkbtf.Spec spec, err := tkbtf.NewSpecFromKernel() if err != nil { if !errors.Is(err, tkbtf.ErrSpecKernelNotSupported) { return nil, err } } else { specs = append(specs, spec) } embeddedSpecs, err := loadEmbeddedSpecs() if err != nil { return nil, err } specs = append(specs, embeddedSpecs...) return specs, nil } func loadEmbeddedSpecs() ([]*tkbtf.Spec, error) { var specs []*tkbtf.Spec err := fs.WalkDir(embedBTFFolder, ".", func(path string, d fs.DirEntry, err error) error { if err != nil { return err } if !strings.HasSuffix(path, ".btf") { return nil } embedFileBytes, err := embedBTFFolder.ReadFile(path) if err != nil { return err } embedSpec, err := tkbtf.NewSpecFromReader(bytes.NewReader(embedFileBytes), nil) if err != nil { return err } specs = append(specs, embedSpec) return nil }) if err != nil { return nil, err } return specs, nil } func verify(ctx context.Context, exec executor, probes map[tracing.Probe]tracing.AllocateFn, timeout time.Duration) error { basePath, err := os.MkdirTemp("", "verifier") if err != nil { return err } defer os.RemoveAll(basePath) verifier, err := newEventsVerifier(basePath) if err != nil { return err } pChannel, err := newPerfChannel(probes, 4, 512, exec.GetTID()) if err != nil { return err } m, err := newMonitor(ctx, true, pChannel, exec) if err != nil { return err } defer m.Close() // start the monitor if err := m.Start(); err != nil { return err } // spaw goroutine to send events to verifier to be verified cancel := make(chan struct{}) defer close(cancel) retC := make(chan error) go func() { defer close(retC) for { select { case runErr := <-m.ErrorChannel(): retC <- runErr return case ev, ok := <-m.EventChannel(): if !ok { retC <- errors.New("monitor closed unexpectedly") return } if err := verifier.validateEvent(ev.Path, ev.PID, ev.Op); err != nil { retC <- err return } continue case <-time.After(timeout): return case <-cancel: return } } }() // add verify base path to monitor if err := m.Add(basePath); err != nil { return err } // invoke verifier event generation from our executor if err := exec.Run(verifier.GenerateEvents); err != nil { return err } // wait for either no new events arriving for timeout duration or // ctx to be cancelled select { case err = <-retC: if err != nil { return err } case <-ctx.Done(): return ctx.Err() } // check that all events have been verified if err := verifier.Verified(); err != nil { return err } return nil }