testing/testrunner/ebpfrunner.go (153 lines of code) (raw):

// SPDX-License-Identifier: Elastic-2.0 /* * Copyright 2022 Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under * one or more contributor license agreements. Licensed under the Elastic * License 2.0; you may not use this file except in compliance with the Elastic * License 2.0. */ package testrunner import ( "bufio" "context" "encoding/json" "fmt" "io" "os/exec" "strings" "testing" "time" "github.com/stretchr/testify/require" ) type Runner struct { ctx context.Context Cmd *exec.Cmd Stdout io.ReadCloser Stderr io.ReadCloser StdoutChan chan string StderrChan chan string readChan chan string doneChan chan struct{} errChan chan error InitMsg InitMsg t *testing.T // buffers carry lifelong copy of stderr/stdout errBuff []string outBuff []string readCursor int } func runStreamChannel(sender chan string, errChan chan error, buffer *bufio.Scanner) { buf := make([]byte, 0, 64*1024) buffer.Buffer(buf, 1024*1024) go func() { for buffer.Scan() { line := buffer.Text() txt := strings.TrimSpace(line) if len(txt) > 0 { sender <- txt } } // the go testing libraries don't like it when you call // t.Fail() in a child thread; so we have to trickle down the failure if err := buffer.Err(); err != nil { errChan <- fmt.Errorf("error in buffer: %w", err) return } }() } func (runner *Runner) runIORead() { runner.readCursor = 0 defer func() { close(runner.readChan) }() for { // this select case must never block, or else the underlying write() syscalls in EventsTrace // could block. select { case <-runner.doneChan: return case <-runner.ctx.Done(): runner.t.Logf("got context done") return case line := <-runner.StderrChan: runner.errBuff = append(runner.errBuff, line) case line := <-runner.StdoutChan: runner.outBuff = append(runner.outBuff, line) select { case runner.readChan <- runner.outBuff[runner.readCursor]: runner.readCursor += 1 default: } } } } func (runner *Runner) Start() { err := runner.Cmd.Start() require.NoError(runner.t, err, "error starting EventsTrace. You may need to run `make build` and `make package`") stderrStream := bufio.NewScanner(runner.Stderr) stdoutStream := bufio.NewScanner(runner.Stdout) runStreamChannel(runner.StdoutChan, runner.errChan, stdoutStream) runStreamChannel(runner.StderrChan, runner.errChan, stderrStream) go func() { runner.runIORead() }() // run until we get the first log line select { case <-runner.ctx.Done(): runner.t.Fatalf("timed out while waiting for initial response from EventsTrace") case line := <-runner.readChan: err := json.Unmarshal([]byte(line), &runner.InitMsg) require.NoError(runner.t, err, "could not unmarshall json of first line. Stderr: \n", runner.errBuff) } } func (runner *Runner) GetNextEventOut(types ...string) string { ctx, cancel := context.WithTimeout(runner.ctx, time.Minute) defer cancel() type baseEvent struct { EventType string `json:"event_type"` } for { select { case <-ctx.Done(): runner.t.Fatalf("timed out waiting for %v events", types) case err := <-runner.errChan: require.NoError(runner.t, err, "error reading from stdout/stderr in buffer") case line := <-runner.readChan: var resp baseEvent err := json.Unmarshal([]byte(line), &resp) require.NoError(runner.t, err, "error unmarshaling event_type from event %s", line) for _, evtType := range types { if evtType == resp.EventType { return line } } } } } func (runner *Runner) UnmarshalNextEvent(body any, types ...string) { line := runner.GetNextEventOut(types...) err := json.Unmarshal([]byte(line), &body) require.NoError(runner.t, err, "error unmarshaling JSON for types %v", types) } func (runner *Runner) Stop() { runner.doneChan <- struct{}{} err := runner.Cmd.Process.Kill() require.NoError(runner.t, err) _, err = runner.Cmd.Process.Wait() require.NoError(runner.t, err) } func (runner *Runner) Dump() { runner.t.Logf("STDOUT: \n") for _, line := range runner.outBuff { runner.t.Logf("%s", line) } runner.t.Logf("STDERR: \n") for _, line := range runner.errBuff { runner.t.Logf("%s", line) } } func NewEbpfRunner(ctx context.Context, t *testing.T, args ...string) *Runner { testRunner := &Runner{ ctx: ctx, StdoutChan: make(chan string, 1024), StderrChan: make(chan string, 1024), readChan: make(chan string, 1024), doneChan: make(chan struct{}), errChan: make(chan error, 1), t: t, } args = append(args, "--print-features-on-init", "--unbuffer-stdout", "--libbpf-verbose") testRunner.Cmd = exec.CommandContext(ctx, eventsTracePath, args...) var err error testRunner.Stdout, err = testRunner.Cmd.StdoutPipe() require.NoError(t, err, "failed to redirect stdout") testRunner.Stderr, err = testRunner.Cmd.StderrPipe() require.NoError(t, err, "failed to redirect stderr") return testRunner }