libbeat/testing/integration/run_beat.go (251 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package integration
import (
"bufio"
"context"
"crypto/sha256"
"encoding/json"
"errors"
"fmt"
"io"
"os"
"os/exec"
"path/filepath"
"runtime"
"strings"
"sync"
"testing"
)
var (
compiling sync.Mutex
// map of Beat names to binary hashes that `EnsureCompiled` function built
compiled = map[string]string{}
hash = sha256.New()
)
// RunningBeat describes the running Beat binary.
type RunningBeat struct {
c *exec.Cmd
outputRW sync.RWMutex
output []string
outputDone chan struct{}
watcher OutputWatcher
keepRunning bool
}
// CollectOutput returns the last `limit` lines of the currently
// accumulated output.
// `limit=-1` returns the entire output from the beginning.
func (b *RunningBeat) CollectOutput(limit int) string {
b.outputRW.RLock()
defer b.outputRW.RUnlock()
if limit < 0 {
limit = len(b.output)
}
builder := strings.Builder{}
output := b.output
if len(output) > limit {
output = output[len(output)-limit:]
}
m := make(map[string]any)
for i, l := range output {
err := json.Unmarshal([]byte(l), &m)
if err != nil {
builder.WriteString(l)
} else {
pretty, _ := json.MarshalIndent(m, "", " ")
builder.Write(pretty)
}
if i < len(output)-1 {
builder.WriteByte('\n')
}
}
return builder.String()
}
// Wait until the Beat exists and all the output is processed
func (b *RunningBeat) Wait() error {
err := b.c.Wait()
<-b.outputDone
return err
}
func (b *RunningBeat) writeOutputLine(line string) {
b.outputRW.Lock()
defer b.outputRW.Unlock()
b.output = append(b.output, line)
if b.watcher == nil {
return
}
b.watcher.Inspect(line)
if b.watcher.Observed() {
if !b.keepRunning {
_ = b.c.Process.Kill()
}
b.watcher = nil
}
}
// RunBeatOptions describes the options for running a Beat
type RunBeatOptions struct {
// Beatname, for example "filebeat".
Beatname string
// Config for the Beat written in YAML
Config string
// Args sets additional arguments to pass when running the binary.
Args []string
// KeepRunning if set to `true` observing all
// the expected output would not kill the process.
//
// In this case user controls the runtime through the context
// passed in `RunBeat`.
KeepRunning bool
}
// RunBeat runs a Beat binary with the given config and args.
// Returns a `RunningBeat` that allow to collect the output and wait until the exit.
func RunBeat(ctx context.Context, t *testing.T, opts RunBeatOptions, watcher OutputWatcher) *RunningBeat {
t.Logf("preparing to run %s...", opts.Beatname)
binaryFilename := findBeatBinaryPath(t, opts.Beatname)
dir := t.TempDir()
// create a temporary Beat config
cfgPath := filepath.Join(dir, fmt.Sprintf("%s.yml", opts.Beatname))
homePath := filepath.Join(dir, "home")
err := os.WriteFile(cfgPath, []byte(opts.Config), 0644)
if err != nil {
t.Fatalf("failed to create a temporary config file: %s", err)
return nil
}
t.Logf("temporary config has been created at %s", cfgPath)
// compute the args for execution
baseArgs := []string{
// logging to stderr instead of log files
"-e",
"-c", cfgPath,
// we want all the logs
"-E", "logging.level=debug",
// so we can run multiple Beats at the same time
"--path.home", homePath,
}
execArgs := make([]string, 0, len(baseArgs)+len(opts.Args))
execArgs = append(execArgs, baseArgs...)
execArgs = append(execArgs, opts.Args...)
t.Logf("running %s %s", binaryFilename, strings.Join(execArgs, " "))
c := exec.CommandContext(ctx, binaryFilename, execArgs...)
// we must use 2 pipes since writes are not aligned by lines
// part of the stdout output can end up in the middle of the stderr line
stdout, err := c.StdoutPipe()
if err != nil {
t.Fatalf("failed to create the stdout pipe: %s", err)
return nil
}
stderr, err := c.StderrPipe()
if err != nil {
t.Fatalf("failed to create the stdout pipe: %s", err)
return nil
}
b := &RunningBeat{
c: c,
watcher: watcher,
keepRunning: opts.KeepRunning,
outputDone: make(chan struct{}),
}
var wg sync.WaitGroup
// arbitrary buffer size
output := make(chan string, 128)
wg.Add(2)
go func() {
processPipe(t, stdout, output)
wg.Done()
}()
go func() {
processPipe(t, stderr, output)
wg.Done()
}()
go func() {
wg.Wait()
close(output)
}()
go func() {
for line := range output {
b.writeOutputLine(line)
}
close(b.outputDone)
}()
err = c.Start()
if err != nil {
t.Fatalf("failed to start Filebeat command: %s", err)
return nil
}
t.Logf("%s is running (pid: %d)", binaryFilename, c.Process.Pid)
return b
}
func processPipe(t *testing.T, r io.Reader, output chan<- string) {
scanner := bufio.NewScanner(r)
for scanner.Scan() {
output <- scanner.Text()
}
if scanner.Err() != nil {
t.Logf("error while reading from stdout/stderr: %s", scanner.Err())
}
}
// EnsureCompiled ensures that the given Beat is compiled and ready
// to run.
// This functions allows to use binaries only built by this function.
// Externally created binaries will be removed and rebuilt.
func EnsureCompiled(ctx context.Context, t *testing.T, beatname string) (path string) {
compiling.Lock()
defer compiling.Unlock()
t.Logf("ensuring the %s binary is available...", beatname)
binaryFilename := findBeatBinaryPath(t, beatname)
// empty if the binary was not compiled before
expectedHash := compiled[beatname]
// we allow to use binaries only built by this function.
// binaries from different origins are marked as outdated
_, err := os.Stat(binaryFilename)
if err != nil && !errors.Is(err, os.ErrNotExist) {
t.Fatalf("failed to check for compiled binary %s: %s", binaryFilename, err)
return ""
}
if err == nil {
actualHash := hashBinary(t, binaryFilename)
if actualHash == expectedHash {
t.Logf("%s binary has been compiled before at %s, using...", beatname, binaryFilename)
return binaryFilename
}
t.Logf("found outdated %s binary at %s, removing...", beatname, binaryFilename)
err := os.Remove(binaryFilename)
if err != nil {
t.Fatalf("failed to remove outdated %s binary at %s: %s", beatname, binaryFilename, err)
return ""
}
} else {
t.Logf("%s binary was not found at %s", beatname, binaryFilename)
}
mageCommand := "mage"
if runtime.GOOS == "windows" {
mageCommand += ".exe"
}
args := []string{"build"}
t.Logf("building %s binary with \"%s %s\"... ", binaryFilename, mageCommand, strings.Join(args, " "))
c := exec.CommandContext(ctx, mageCommand, args...)
c.Dir = filepath.Dir(binaryFilename)
output, err := c.CombinedOutput()
if err != nil {
t.Fatalf("failed to build %s binary: %s\n%s", beatname, err, output)
return ""
}
_, err = os.Stat(binaryFilename)
if err == nil {
t.Logf("%s binary has been successfully built ", binaryFilename)
compiled[beatname] = hashBinary(t, binaryFilename)
return binaryFilename
}
if !errors.Is(err, os.ErrNotExist) {
t.Fatalf("building command for binary %s succeeded but the binary was not created: %s", binaryFilename, err)
return ""
}
return ""
}
func hashBinary(t *testing.T, filename string) string {
f, err := os.Open(filename)
if err != nil {
t.Fatalf("failed to open %s: %s", filename, err)
return ""
}
defer f.Close()
hash.Reset()
if _, err := io.Copy(hash, f); err != nil {
t.Fatalf("failed to hash %s: %s", filename, err)
return ""
}
return fmt.Sprintf("%x", hash.Sum(nil))
}
func findBeatDir(t *testing.T, beatName string) string {
pwd, err := os.Getwd()
if err != nil {
t.Fatalf("failed to get the working directory: %s", err)
return ""
}
t.Logf("searching for the %s directory, starting with %s...", beatName, pwd)
for pwd != "" {
stat, err := os.Stat(filepath.Join(pwd, beatName))
if errors.Is(err, os.ErrNotExist) || !stat.IsDir() {
pwd = filepath.Dir(pwd)
continue
}
return filepath.Join(pwd, beatName)
}
t.Fatalf("could not find the %s base directory", beatName)
return ""
}
func findBeatBinaryPath(t *testing.T, beatname string) string {
baseDir := findBeatDir(t, beatname)
t.Logf("found %s directory at %s", beatname, baseDir)
binary := filepath.Join(baseDir, beatname)
if runtime.GOOS == "windows" {
binary += ".exe"
}
return binary
}