pkg/testing/fixture.go (1,237 lines of code) (raw):

// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License 2.0; // you may not use this file except in compliance with the Elastic License 2.0. package testing import ( "context" "encoding/json" "errors" "fmt" "os" "os/exec" "path" "path/filepath" "runtime" "strings" "sync" "testing" "time" "github.com/cenkalti/backoff/v4" "github.com/otiai10/copy" "github.com/stretchr/testify/require" "gopkg.in/yaml.v2" "github.com/elastic/elastic-agent/internal/pkg/agent/application/paths" "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/details" "github.com/elastic/elastic-agent/internal/pkg/agent/install" "github.com/elastic/elastic-agent/pkg/component" "github.com/elastic/elastic-agent/pkg/control" "github.com/elastic/elastic-agent/pkg/control/v2/client" "github.com/elastic/elastic-agent/pkg/control/v2/cproto" "github.com/elastic/elastic-agent/pkg/core/process" ) // Fixture handles the setup and management of the Elastic Agent. type Fixture struct { t *testing.T version string caller string fetcher Fetcher operatingSystem string architecture string packageFormat string logOutput bool allowErrs bool connectTimout time.Duration binaryName string runLength time.Duration additionalArgs []string srcPackage string workDir string extractDir string installed bool installOpts *InstallOpts c client.Client cMx sync.RWMutex // Uninstall token value that is needed for the agent uninstall if it's tamper protected uninstallToken string // fileNamePrefix is a prefix to be used when saving files from this test. // it's set by FileNamePrefix and once it's set, FileNamePrefix will return // its value. fileNamePrefix string } // FixtureOpt is an option for the fixture. type FixtureOpt func(s *Fixture) // WithFetcher changes the fetcher that is used for the fixture. func WithFetcher(fetcher Fetcher) FixtureOpt { if fetcher == nil { panic("fetcher cannot be nil") } return func(f *Fixture) { f.fetcher = fetcher } } // WithOSArchitecture changes the operating system and the architecture to use for the fixture. // By default, the runtime operating system and the architecture is selected. func WithOSArchitecture(operatingSystem string, architecture string) FixtureOpt { return func(f *Fixture) { f.operatingSystem = operatingSystem f.architecture = architecture } } // WithPackageFormat changes the package format to use for the fixture. // By default, targz is picked except for windows which uses zip func WithPackageFormat(packageFormat string) FixtureOpt { return func(f *Fixture) { f.packageFormat = packageFormat } } // WithLogOutput instructs the fixture to log all Elastic Agent output to the test log. // By default, the Elastic Agent output will not be logged to the test logger. func WithLogOutput() FixtureOpt { return func(f *Fixture) { f.logOutput = true } } // WithAllowErrors instructs the fixture to allow the Elastic Agent to log errors. // By default, the Fixture will not allow the Elastic Agent to log any errors, logging any error // will result on the Fixture to kill the Elastic Agent and report it as an error. func WithAllowErrors() FixtureOpt { return func(f *Fixture) { f.allowErrs = true } } // WithConnectTimeout changes the timeout for connecting to the spawned Elastic Agent control protocol. // By default, the timeout is 5 seconds. func WithConnectTimeout(timeout time.Duration) FixtureOpt { return func(f *Fixture) { f.connectTimout = timeout } } // WithBinaryName sets the name of the binary under test, in cases where tests aren't being run against elastic-agent func WithBinaryName(name string) FixtureOpt { return func(f *Fixture) { f.binaryName = name } } // WithRunLength sets the total time the binary will run func WithRunLength(run time.Duration) FixtureOpt { return func(f *Fixture) { f.runLength = run } } func WithAdditionalArgs(args []string) FixtureOpt { return func(f *Fixture) { f.additionalArgs = args } } // NewFixture creates a new fixture to setup and manage Elastic Agent. func NewFixture(t *testing.T, version string, opts ...FixtureOpt) (*Fixture, error) { // we store the caller so the fixture can find the cache directory for the artifacts that // are used for the testing with the Elastic Agent. // // runtime.Caller(1) is used because we want the filename of the caller, not the path of // our self on the filesystem. _, caller, _, ok := runtime.Caller(1) if !ok { return nil, errors.New("unable to determine callers file path") } pkgFormat := "targz" if runtime.GOOS == "windows" { pkgFormat = "zip" } f := &Fixture{ t: t, version: version, caller: caller, fetcher: ArtifactFetcher(), operatingSystem: runtime.GOOS, architecture: runtime.GOARCH, packageFormat: pkgFormat, connectTimout: 15 * time.Second, // default to elastic-agent, can be changed by a set FixtureOpt below binaryName: "elastic-agent", } for _, o := range opts { o(f) } return f, nil } // Client returns the Elastic Agent communication client. func (f *Fixture) Client() client.Client { f.cMx.RLock() defer f.cMx.RUnlock() return f.c } // Version returns the Elastic Agent version. func (f *Fixture) Version() string { return f.version } // Prepare prepares the Elastic Agent for usage. // // This must be called before `Configure`, `Run`, or `Install` can be called. // `components` defines the components that you want to be prepared for the // Elastic Agent. See the definition on defining usable components on the // `UsableComponent` structure. // // Note: If no `components` are defined then the Elastic Agent will keep all the components that are shipped with the // fetched build of the Elastic Agent. func (f *Fixture) Prepare(ctx context.Context, components ...UsableComponent) error { err := validateComponents(components...) if err != nil { return err } if f.extractDir != "" { // already prepared return fmt.Errorf("already been prepared") } src, err := f.fetch(ctx) if err != nil { return err } f.srcPackage = src filename := filepath.Base(src) // Determine name of extracted Agent artifact directory from // the artifact filename. name, _, err := splitFileType(filename) if err != nil { return err } // If the name has "-fips" in it, remove that part because // the extracted directory does not have that in it, even though // the artifact filename does. name = strings.Replace(name, "-fips", "", 1) extractDir := createTempDir(f.t) finalDir := filepath.Join(extractDir, name) err = ExtractArtifact(f.t, src, extractDir) if err != nil { return fmt.Errorf("extracting artifact %q in %q: %w", src, extractDir, err) } err = f.prepareComponents(finalDir, components...) if err != nil { return err } f.extractDir = finalDir f.workDir = finalDir return nil } // WriteFileToWorkDir sends a file to the working directory alongside the unpacked tar build. func (f *Fixture) WriteFileToWorkDir(ctx context.Context, data string, name string) error { err := f.EnsurePrepared(ctx) if err != nil { return fmt.Errorf("error preparing binary: %w", err) } err = os.WriteFile(filepath.Join(f.workDir, name), []byte(data), 0644) if err != nil { return fmt.Errorf("error writing file: %w", err) } f.t.Logf("wrote %s to %s", name, f.workDir) return nil } // Configure replaces the default Agent configuration file with the provided // configuration. This must be called after `Prepare` is called but before `Run` // or `Install` can be called. func (f *Fixture) Configure(ctx context.Context, yamlConfig []byte) error { err := f.EnsurePrepared(ctx) if err != nil { return err } cfgFilePath := filepath.Join(f.workDir, "elastic-agent.yml") return os.WriteFile(cfgFilePath, yamlConfig, 0600) } // SetUninstallToken sets uninstall token func (f *Fixture) SetUninstallToken(uninstallToken string) { f.uninstallToken = uninstallToken } // WorkDir returns the installed fixture's work dir AKA base dir AKA top dir. This // must be called after `Install` is called. func (f *Fixture) WorkDir() string { return f.workDir } // SrcPackage returns the location on disk of the elastic agent package used by this fixture. func (f *Fixture) SrcPackage(ctx context.Context) (string, error) { err := f.EnsurePrepared(ctx) if err != nil { return "", err } return f.srcPackage, nil } // PackageFormat returns the package format for the fixture func (f *Fixture) PackageFormat() string { return f.packageFormat } func ExtractArtifact(l Logger, artifactFile, outputDir string) error { filename := filepath.Base(artifactFile) _, ext, err := splitFileType(filename) if err != nil { return err } l.Logf("Extracting artifact %s to %s", filename, outputDir) switch ext { case ".tar.gz": err := untar(artifactFile, outputDir) if err != nil { return fmt.Errorf("failed to untar %s: %w", artifactFile, err) } case ".zip": err := unzip(artifactFile, outputDir) if err != nil { return fmt.Errorf("failed to unzip %s: %w", artifactFile, err) } case ".deb", "rpm": err := copy.Copy(artifactFile, filepath.Join(outputDir, filepath.Base(artifactFile))) if err != nil { return fmt.Errorf("failed to copy %s to %s: %w", artifactFile, outputDir, err) } } l.Logf("Completed extraction of artifact %s to %s", filename, outputDir) return nil } // RunBeat runs the given given beat // the beat will run until an error, or the given timeout is reached func (f *Fixture) RunBeat(ctx context.Context) error { if f.binaryName == "elastic-agent" { return errors.New("RunBeat() can't be run against elastic-agent") } if _, deadlineSet := ctx.Deadline(); !deadlineSet { f.t.Fatal("Context passed to Fixture.RunBeat() has no deadline set.") } var err error err = f.EnsurePrepared(ctx) if err != nil { return fmt.Errorf("error preparing beat: %w", err) } var logProxy Logger if f.logOutput { logProxy = f.t } stdOut := newLogWatcher(logProxy) stdErr := newLogWatcher(logProxy) args := []string{"run", "-e", "-c", filepath.Join(f.workDir, fmt.Sprintf("%s.yml", f.binaryName))} args = append(args, f.additionalArgs...) proc, err := process.Start( f.binaryPath(), process.WithContext(ctx), process.WithArgs(args), process.WithCmdOptions(attachOutErr(stdOut, stdErr))) if err != nil { return fmt.Errorf("failed to spawn %s: %w", f.binaryName, err) } procWaitCh := proc.Wait() killProc := func() { _ = proc.Kill() <-procWaitCh } var doneChan <-chan time.Time if f.runLength != 0 { doneChan = time.After(f.runLength) } stopping := false for { select { case <-ctx.Done(): killProc() return ctx.Err() case ps := <-procWaitCh: if stopping { return nil } return fmt.Errorf("elastic-agent exited unexpectedly with exit code: %d", ps.ExitCode()) case err := <-stdOut.Watch(): if !f.allowErrs { // no errors allowed killProc() return fmt.Errorf("elastic-agent logged an unexpected error: %w", err) } case err := <-stdErr.Watch(): if !f.allowErrs { // no errors allowed killProc() return fmt.Errorf("elastic-agent logged an unexpected error: %w", err) } case <-doneChan: if !stopping { // trigger the stop stopping = true _ = proc.Stop() } } } } // RunProcess runs the given given process // the process will run until an error, or the given timeout is reached func RunProcess(t *testing.T, lp Logger, ctx context.Context, runLength time.Duration, logOutput, allowErrs bool, processPath string, args ...string) error { if _, deadlineSet := ctx.Deadline(); !deadlineSet { t.Fatal("Context passed to RunProcess() has no deadline set.") } var err error var logProxy Logger if logOutput { logProxy = lp } stdOut := newLogWatcher(logProxy) stdErr := newLogWatcher(logProxy) proc, err := process.Start( processPath, process.WithContext(ctx), process.WithArgs(args), process.WithCmdOptions(attachOutErr(stdOut, stdErr))) if err != nil { return fmt.Errorf("failed to spawn %q: %w", processPath, err) } procWaitCh := proc.Wait() killProc := func() { _ = proc.Kill() <-procWaitCh } var doneChan <-chan time.Time if runLength != 0 { doneChan = time.After(runLength) } stopping := false for { select { case <-ctx.Done(): killProc() return ctx.Err() case ps := <-procWaitCh: if stopping { return nil } return fmt.Errorf("elastic-agent exited unexpectedly with exit code: %d", ps.ExitCode()) case err := <-stdOut.Watch(): if !allowErrs { // no errors allowed killProc() return fmt.Errorf("elastic-agent logged an unexpected error: %w", err) } case err := <-stdErr.Watch(): if !allowErrs { // no errors allowed killProc() return fmt.Errorf("elastic-agent logged an unexpected error: %w", err) } case <-doneChan: if !stopping { // trigger the stop stopping = true _ = proc.Stop() } } } } // RunOtelWithClient runs the provided binary in otel mode. // // If `states` are provided, agent runs until each state has been reached. Once reached the // Elastic Agent is stopped. If at any time the Elastic Agent logs an error log and the Fixture is not started // with `WithAllowErrors()` then `Run` will exit early and return the logged error. // // If no `states` are provided then the Elastic Agent runs until the context is cancelled. // // The Elastic-Agent is started agent in test mode (--testing-mode) this mode // expects the initial configuration (full YAML config) via gRPC. // This configuration should be passed in the State.Configure field. // // The `elastic-agent.yml` generated by `Fixture.Configure` is ignored // when `Run` is called. // // if shouldWatchState is set to false, communicating state does not happen. func (f *Fixture) RunOtelWithClient(ctx context.Context, states ...State) error { return f.executeWithClient(ctx, "otel", false, false, false, states...) } func (f *Fixture) executeWithClient(ctx context.Context, command string, disableEncryptedStore bool, shouldWatchState bool, enableTestingMode bool, states ...State) error { if _, deadlineSet := ctx.Deadline(); !deadlineSet { f.t.Fatal("Context passed to Fixture.Run() has no deadline set.") } if f.binaryName != "elastic-agent" { return errors.New("Run() can only be used with elastic-agent, use RunBeat()") } if f.installed { return errors.New("fixture is installed; cannot be run") } var err error err = f.EnsurePrepared(ctx) if err != nil { return err } ctx, cancel := context.WithCancel(ctx) defer cancel() var smInstance *stateMachine if states != nil { smInstance, err = newStateMachine(states) if err != nil { return err } } // agent-specific setup var agentClient client.Client var stateCh chan *client.AgentState var stateErrCh chan error cAddr, err := control.AddressFromPath(f.operatingSystem, f.workDir) if err != nil { return fmt.Errorf("failed to get control protcol address: %w", err) } var logProxy Logger if f.logOutput { logProxy = f.t } stdOut := newLogWatcher(logProxy) stdErr := newLogWatcher(logProxy) args := []string{command, "-e"} if disableEncryptedStore { args = append(args, "--disable-encrypted-store") } if enableTestingMode { args = append(args, "--testing-mode") } args = append(args, f.additionalArgs...) proc, err := process.Start( f.binaryPath(), process.WithContext(ctx), process.WithArgs(args), process.WithCmdOptions(attachOutErr(stdOut, stdErr))) if err != nil { return fmt.Errorf("failed to spawn %s: %w", f.binaryName, err) } if shouldWatchState { agentClient = client.New(client.WithAddress(cAddr)) f.setClient(agentClient) defer f.setClient(nil) stateCh, stateErrCh = watchState(ctx, f.t, agentClient, f.connectTimout) } var doneChan <-chan time.Time if f.runLength != 0 { doneChan = time.After(f.runLength) } procWaitCh := proc.Wait() killProc := func() { _ = proc.Kill() <-procWaitCh } stopping := false for { select { case <-ctx.Done(): killProc() return ctx.Err() case ps := <-procWaitCh: if stopping { return nil } return fmt.Errorf("elastic-agent exited unexpectedly with exit code: %d", ps.ExitCode()) case err := <-stdOut.Watch(): if !f.allowErrs { // no errors allowed killProc() return fmt.Errorf("elastic-agent logged an unexpected error: %w", err) } case err := <-stdErr.Watch(): if !f.allowErrs { // no errors allowed killProc() return fmt.Errorf("elastic-agent logged an unexpected error: %w", err) } case err := <-stateErrCh: if !stopping { // Give the log watchers a second to write out the agent logs. // Client connnection failures can happen quickly enough to prevent logging. time.Sleep(time.Second) // connection to elastic-agent failed killProc() return fmt.Errorf("elastic-agent client received unexpected error: %w", err) } case <-doneChan: if !stopping { // trigger the stop stopping = true _ = proc.Stop() } case state := <-stateCh: if smInstance != nil { cfg, cont, err := smInstance.next(ctx, state) if err != nil { killProc() return fmt.Errorf("state management failed with unexpected error: %w", err) } if !cont { if !stopping { // trigger the stop stopping = true _ = proc.Stop() } } else if cfg != "" { err := performConfigure(ctx, agentClient, cfg, 3*time.Second) if err != nil { killProc() return err } } } } } } // Run runs the provided binary. // // If `states` are provided, agent runs until each state has been reached. Once reached the // Elastic Agent is stopped. If at any time the Elastic Agent logs an error log and the Fixture is not started // with `WithAllowErrors()` then `Run` will exit early and return the logged error. // // If no `states` are provided then the Elastic Agent runs until the context is cancelled. // // The Elastic-Agent is started agent in test mode (--testing-mode) this mode // expects the initial configuration (full YAML config) via gRPC. // This configuration should be passed in the State.Configure field. // // The `elastic-agent.yml` generated by `Fixture.Configure` is ignored // when `Run` is called. func (f *Fixture) Run(ctx context.Context, states ...State) error { return f.executeWithClient(ctx, "run", true, true, true, states...) } // Exec provides a way of performing subcommand on the prepared Elastic Agent binary. func (f *Fixture) Exec(ctx context.Context, args []string, opts ...process.CmdOption) ([]byte, error) { err := f.EnsurePrepared(ctx) if err != nil { return nil, fmt.Errorf("failed to prepare before exec: %w", err) } ctx, cancel := context.WithCancel(ctx) defer cancel() cmd, err := f.PrepareAgentCommand(ctx, args, opts...) if err != nil { return nil, fmt.Errorf("error creating cmd: %w", err) } f.t.Logf(">> running binary with: %v", cmd.Args) return cmd.CombinedOutput() } // PrepareAgentCommand creates an exec.Cmd ready to execute an elastic-agent command. func (f *Fixture) PrepareAgentCommand(ctx context.Context, args []string, opts ...process.CmdOption) (*exec.Cmd, error) { err := f.EnsurePrepared(ctx) if err != nil { return nil, fmt.Errorf("failed to prepare before exec: %w", err) } // #nosec G204 -- Not so many ways to support variadic arguments to the elastic-agent command :( cmd := exec.CommandContext(ctx, f.binaryPath(), args...) for _, o := range opts { if err := o(cmd); err != nil { return nil, fmt.Errorf("error adding opts to Exec: %w", err) } } return cmd, nil } type ExecErr struct { err error Output []byte } func (e *ExecErr) Error() string { return e.String() } func (e *ExecErr) String() string { return fmt.Sprintf("error: %v, output: %s", e.err, e.Output) } func (e *ExecErr) As(target any) bool { switch target.(type) { case *ExecErr: target = e return true case ExecErr: target = *e return true default: return errors.As(e.err, &target) } } func (e *ExecErr) Unwrap() error { return e.err } type statusOpts struct { noRetry bool retryTimeout time.Duration retryInterval time.Duration cmdOptions []process.CmdOption } type statusOpt func(*statusOpts) // WithNoRetry disables the retry logic in ExecStatus function call. func WithNoRetry() func(opt *statusOpts) { return func(opt *statusOpts) { opt.noRetry = true } } // WithRetryTimeout adjusts the retry timeout from the default value of one minute. func WithRetryTimeout(duration time.Duration) func(opt *statusOpts) { return func(opt *statusOpts) { opt.retryTimeout = duration } } // WithRetryInterval adjusts the retry interval from the default value of one second. func WithRetryInterval(duration time.Duration) func(opt *statusOpts) { return func(opt *statusOpts) { opt.retryInterval = duration } } // WithCmdOptions adjusts the options of the command when status is called. func WithCmdOptions(cmdOptions ...process.CmdOption) func(opt *statusOpts) { return func(opt *statusOpts) { opt.cmdOptions = append(opt.cmdOptions, cmdOptions...) } } // ExecStatus executes `elastic-agent status --output=json`. // // Returns the parsed output and the error from the execution. Keep in mind the agent exits with status 1 if it's // unhealthy, but it still outputs the status successfully. This call does require that the Elastic Agent is running // and communication over the control protocol is working. // // By default, retry logic is applied. Use WithNoRetry to disable this behavior. WithRetryTimeout and WithRetryInterval // can be used to adjust the retry logic timing. The default retry timeout is one minute and the default retry // interval is one second. // // An empty AgentStatusOutput and non nil error means the output could not be parsed. As long as we get some output, // we don't return any error. It should work with any 8.6+ agent func (f *Fixture) ExecStatus(ctx context.Context, opts ...statusOpt) (AgentStatusOutput, error) { var opt statusOpts opt.retryTimeout = 1 * time.Minute opt.retryInterval = 1 * time.Second for _, o := range opts { o(&opt) } var cancel context.CancelFunc if opt.noRetry || opt.retryTimeout == 0 { ctx, cancel = context.WithCancel(ctx) } else { ctx, cancel = context.WithTimeout(ctx, opt.retryTimeout) } defer cancel() var lastErr error for { if ctx.Err() != nil { if errors.Is(ctx.Err(), context.DeadlineExceeded) && lastErr != nil { // return the last observed error return AgentStatusOutput{}, fmt.Errorf("agent status returned an error: %w", lastErr) } return AgentStatusOutput{}, fmt.Errorf("agent status failed: %w", ctx.Err()) } out, err := f.Exec(ctx, []string{"status", "--output", "json"}, opt.cmdOptions...) status := AgentStatusOutput{} if uerr := json.Unmarshal(out, &status); uerr != nil { // unmarshal error means that json was not outputted due to a communication error lastErr = fmt.Errorf("could not unmarshal agent status output: %w:\n%s", errors.Join(uerr, err), out) } else if status.IsZero() { // still not correct try again for a successful status lastErr = fmt.Errorf("agent status output is empty: %w", err) } else { return status, nil } if opt.noRetry { return status, lastErr } sleepFor(ctx, opt.retryInterval) } } // ExecInspect executes to inspect subcommand on the prepared Elastic Agent binary. // It returns the parsed output and the error from the execution or an empty // AgentInspectOutput and the unmarshalling error if it cannot unmarshal the // output. // It should work with any 8.6+ agent func (f *Fixture) ExecInspect(ctx context.Context, opts ...process.CmdOption) (AgentInspectOutput, error) { out, err := f.Exec(ctx, []string{"inspect"}, opts...) inspect := AgentInspectOutput{} if uerr := yaml.Unmarshal(out, &inspect); uerr != nil { return AgentInspectOutput{}, fmt.Errorf("could not unmarshal agent inspect output: %w", errors.Join(&ExecErr{ err: err, Output: out, }, uerr)) } return inspect, err } // ExecVersion executes the version subcommand on the prepared Elastic Agent binary // with '--binary-only'. It returns the parsed YAML output. func (f *Fixture) ExecVersion(ctx context.Context, opts ...process.CmdOption) (AgentVersionOutput, error) { out, err := f.Exec(ctx, []string{"version", "--binary-only", "--yaml"}, opts...) version := AgentVersionOutput{} if uerr := yaml.Unmarshal(out, &version); uerr != nil { return AgentVersionOutput{}, fmt.Errorf("could not unmarshal agent version output: %w", errors.Join(&ExecErr{ err: err, Output: out, }, uerr)) } return version, err } // ExecDiagnostics executes the agent diagnostic and returns the path to the // zip file. If no cmd is provided, `diagnostics` will be used as the default. // The working directory of the command will be set to a temporary directory. // Use extractZipArchive to extract the diagnostics archive. func (f *Fixture) ExecDiagnostics(ctx context.Context, cmd ...string) (string, error) { t := f.t t.Helper() if len(cmd) == 0 { cmd = []string{"diagnostics"} } wd := t.TempDir() diagnosticCmdOutput, err := f.Exec(ctx, cmd, process.WithWorkDir(wd)) t.Logf("diagnostic command completed with output \n%q\n", diagnosticCmdOutput) require.NoErrorf(t, err, "error running diagnostic command: %v", err) t.Logf("checking directory %q for the generated diagnostics archive", wd) files, err := filepath.Glob(filepath.Join(wd, "elastic-agent-diagnostics-*.zip")) require.NoError(t, err) require.Len(t, files, 1) t.Logf("Found %q diagnostic archive.", files[0]) return files[0], err } // AgentID returns the ID of the installed Elastic Agent. func (f *Fixture) AgentID(ctx context.Context, opts ...statusOpt) (string, error) { status, err := f.ExecStatus(ctx, opts...) if err != nil { return "", err } return status.Info.ID, nil } // IsHealthy checks whether the prepared Elastic Agent reports itself as healthy. // It returns an error if either the reported state isn't healthy or if it fails // to fetch the current state. If the status is successfully fetched, but it // isn't healthy, the error will contain the reported status. // This function is compatible with any Elastic Agent version 8.6 or later. func (f *Fixture) IsHealthy(ctx context.Context, opts ...statusOpt) error { status, err := f.ExecStatus(ctx, opts...) if err != nil { return fmt.Errorf("agent status returned an error: %w", err) } if status.State != int(cproto.State_HEALTHY) { return fmt.Errorf("agent isn't healthy, current status: %s", client.State(status.State)) //nolint:gosec // value will never be over 32-bit } return nil } // IsInstalled returns true if this fixture has been installed func (f *Fixture) IsInstalled() bool { return f.installed } // EnsurePrepared ensures that the fixture has been prepared. func (f *Fixture) EnsurePrepared(ctx context.Context) error { if f.extractDir == "" { return f.Prepare(ctx) } return nil } func (f *Fixture) binaryPath() string { workDir := f.workDir if f.installed { installDir := "Agent" if f.installOpts != nil && f.installOpts.Namespace != "" { installDir = paths.InstallDirNameForNamespace(f.installOpts.Namespace) } if f.installOpts != nil && f.installOpts.BasePath != "" { workDir = filepath.Join(f.installOpts.BasePath, "Elastic", installDir) } else { workDir = filepath.Join(paths.DefaultBasePath, "Elastic", installDir) } } if f.packageFormat == "deb" || f.packageFormat == "rpm" { workDir = "/usr/bin" } defaultBin := "elastic-agent" if f.binaryName != "" { defaultBin = f.binaryName } binary := filepath.Join(workDir, defaultBin) if f.operatingSystem == "windows" { binary += ".exe" } return binary } func (f *Fixture) fetch(ctx context.Context) (string, error) { cache := f.getFetcherCache() cache.mx.Lock() defer cache.mx.Unlock() if cache.dir == "" { // set the directory for the artifacts for this fetcher // the contents are placed local to the project so that on debugging // of tests the same contents are used for each run dir, err := getCacheDir(f.caller, f.fetcher.Name()) if err != nil { return "", fmt.Errorf("failed to get directory for fetcher %s: %w", f.fetcher.Name(), err) } cache.dir = dir } res, err := f.fetcher.Fetch(ctx, f.operatingSystem, f.architecture, f.version, f.packageFormat) if err != nil { return "", err } path, err := cache.fetch(ctx, f.t, res) if err != nil { return "", err } return path, nil } func (f *Fixture) getFetcherCache() *fetcherCache { fetchCacheMx.Lock() defer fetchCacheMx.Unlock() if fetchCache == nil { fetchCache = make(map[string]*fetcherCache) } cache, ok := fetchCache[f.fetcher.Name()] if !ok { cache = &fetcherCache{} fetchCache[f.fetcher.Name()] = cache } return cache } func (f *Fixture) prepareComponents(workDir string, components ...UsableComponent) error { if len(components) == 0 { f.t.Logf("Components were not modified from the fetched artifact") return nil } // determine the components to keep keep := make(map[string]bool) for _, comp := range components { if comp.BinaryPath == "" { keep[comp.Name] = false } } // now remove all that should not be kept; removal is only // done by removing the spec file, no need to delete the binary componentsDir, err := FindComponentsDir(workDir, "") if err != nil { return err } contents, err := os.ReadDir(componentsDir) if err != nil { return fmt.Errorf("failed to read contents of components directory %s: %w", componentsDir, err) } var kept []string for _, fi := range contents { if fi.IsDir() { // ignore directories (only care about specification files) continue } name := fi.Name() if !strings.HasSuffix(name, ".spec.yml") { // ignore other files (only care about specification files) continue } name = strings.TrimSuffix(name, ".spec.yml") _, ok := keep[name] if !ok { // specification file is not marked to keep, so we remove it // so the Elastic Agent doesn't know how to run that component deleteFile := filepath.Join(componentsDir, fi.Name()) if err := os.Remove(deleteFile); err != nil { return fmt.Errorf("failed to delete component specification %s: %w", deleteFile, err) } } else { kept = append(kept, name) keep[name] = true } } var missing []string for name, found := range keep { if !found { missing = append(missing, name) } } if len(missing) > 0 { return fmt.Errorf("failed to find defined usable components: %s", strings.Join(missing, ", ")) } if len(kept) == 0 { f.t.Logf("All component specifications where removed") } else { f.t.Logf("All component specifications where removed except: %s", strings.Join(kept, ", ")) } // place the components that should be set to be usable by the Elastic Agent var placed []string for _, comp := range components { if comp.BinaryPath == "" { continue } destBinary := filepath.Join(componentsDir, comp.Name) if f.operatingSystem == "windows" { destBinary += ".exe" } if err := copy.Copy(comp.BinaryPath, destBinary); err != nil { return fmt.Errorf("failed to copy %s to %s: %w", comp.BinaryPath, destBinary, err) } if runtime.GOOS != "windows" { // chown is not supported on Windows if err := os.Chown(destBinary, os.Geteuid(), os.Getgid()); err != nil { return fmt.Errorf("failed to chown %s: %w", destBinary, err) } } if err := os.Chmod(destBinary, 0755); err != nil { return fmt.Errorf("failed to chmod %s: %w", destBinary, err) } destSpec := filepath.Join(componentsDir, fmt.Sprintf("%s.spec.yml", comp.Name)) if comp.SpecPath != "" { if err := copy.Copy(comp.SpecPath, destSpec); err != nil { return fmt.Errorf("failed to copy %s to %s: %w", comp.SpecPath, destSpec, err) } } else if comp.Spec != nil { if err := writeSpecFile(destSpec, comp.Spec); err != nil { return fmt.Errorf("failed to write specification file %s: %w", destSpec, err) } } placed = append(placed, comp.Name) } if len(placed) > 0 { f.t.Logf("Placed component specifications: %s", strings.Join(placed, ", ")) } return nil } func (f *Fixture) setClient(c client.Client) { f.cMx.Lock() defer f.cMx.Unlock() f.c = c } func (f *Fixture) DumpProcesses(suffix string) { procs := getProcesses(f.t, `.*`) dir, err := f.DiagnosticsDir() if err != nil { f.t.Logf("failed to dump process: %s", err) return } filePath := filepath.Join(dir, fmt.Sprintf("%s-ProcessDump%s.json", f.FileNamePrefix(), suffix)) fileDir := path.Dir(filePath) if err := os.MkdirAll(fileDir, 0777); err != nil { f.t.Logf("failed to dump process; failed to create directory %s: %s", fileDir, err) return } f.t.Logf("Dumping running processes in %s", filePath) file, err := os.OpenFile(filePath, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0o644) if err != nil { f.t.Logf("failed to dump process; failed to create output file %s root: %s", filePath, err) return } defer func(file *os.File) { err := file.Close() if err != nil { f.t.Logf("error closing file %s: %s", file.Name(), err) } }(file) err = json.NewEncoder(file).Encode(procs) if err != nil { f.t.Logf("error serializing processes: %s", err) } } // MoveToDiagnosticsDir moves file to 'build/diagnostics' which contents are // available on CI if the test fails or on the agent's 'build/diagnostics' // if the test is run locally. // If the file name does nos start with Fixture.FileNamePrefix(), it'll be added // to the filename when moving. func (f *Fixture) MoveToDiagnosticsDir(file string) { dir, err := f.DiagnosticsDir() if err != nil { f.t.Logf("failed to move file to diagnostcs directory: %s", err) return } filename := filepath.Base(file) if !strings.HasPrefix(filename, f.FileNamePrefix()) { filename = fmt.Sprintf("%s-%s", f.FileNamePrefix(), filename) } destFile := filepath.Join(dir, filename) f.t.Logf("moving %q to %q", file, destFile) err = os.Rename(file, destFile) if err != nil { f.t.Logf("failed to move %q to %q: %v", file, destFile, err) } } // FileNamePrefix returns a sanitized and unique name to be used as prefix for // files to be kept as resources for investigation when the test fails. func (f *Fixture) FileNamePrefix() string { if f.fileNamePrefix != "" { return f.fileNamePrefix } stamp := time.Now().Format(time.RFC3339) // on Windows a filename cannot contain a ':' as this collides with disk // labels (aka. C:\) stamp = strings.ReplaceAll(stamp, ":", "-") // Subtest names are separated by "/" characters which are not valid // filenames on Linux. sanitizedTestName := strings.ReplaceAll(f.t.Name(), "/", "-") prefix := fmt.Sprintf("%s-%s", sanitizedTestName, stamp) f.fileNamePrefix = prefix return f.fileNamePrefix } // DiagnosticsDir returned {projectRoot}/build/diagnostics path. Files on this path // are saved if any test fails. Use it to save files for further investigation. func (f *Fixture) DiagnosticsDir() (string, error) { dir, err := findProjectRoot(f.caller) if err != nil { return "", fmt.Errorf("failed to find project root: %w", err) } diagPath := filepath.Join(dir, "build", "diagnostics") if err := os.MkdirAll(diagPath, 0777); err != nil { return "", fmt.Errorf("failed to create directory %s: %w", diagPath, err) } return diagPath, nil } // validateComponents ensures that the provided UsableComponent's are valid. func validateComponents(components ...UsableComponent) error { for idx, comp := range components { name := comp.Name if name == "" { name = fmt.Sprintf("component %d", idx) } err := comp.Validate() if err != nil { return fmt.Errorf("%s: %w", name, err) } } return nil } // findProjectRoot searches the project to find the go.mod file that is defined // at the root of the project. func findProjectRoot(caller string) (string, error) { dir := caller for { dir = filepath.Dir(dir) fi, err := os.Stat(filepath.Join(dir, "go.mod")) if (err == nil || os.IsExist(err)) && !fi.IsDir() { return dir, nil } if strings.HasSuffix(dir, string(filepath.Separator)) { // made it to root directory return "", fmt.Errorf("unable to find golang root directory from caller path %s", caller) } } } // getCacheDir returns the cache directory that a fetcher uses to store its fetched artifacts. func getCacheDir(caller string, name string) (string, error) { dir, err := findProjectRoot(caller) if err != nil { return "", err } cacheDir := filepath.Join(dir, ".agent-testing", name) if err := os.MkdirAll(cacheDir, 0755); err != nil { return "", fmt.Errorf("failed creating directory %s: %w", cacheDir, err) } return cacheDir, nil } // findAgentDataVersionDir identifies the directory that holds the agent data of the given version. func findAgentDataVersionDir(dir, version string) (string, error) { dataDir := filepath.Join(dir, "data") agentVersions, err := os.ReadDir(dataDir) if err != nil { return "", fmt.Errorf("failed to read contents of the data directory %s: %w", dataDir, err) } var versionDir string for _, fi := range agentVersions { filename := fi.Name() if strings.HasPrefix(filename, "elastic-agent-") && fi.IsDir() { // Below we exclude the hash suffix (7 characters) of the directory to check the version if version != "" && filename[:len(filename)-7] != "elastic-agent-"+version { // version specified but version mismatch. in case of upgrade we have multiple // directories, we don't want first found continue } versionDir = filename break } } if versionDir == "" { return "", fmt.Errorf("failed to find versioned directory for version %q", version) } return filepath.Join(dataDir, versionDir), nil } // FindComponentsDir identifies the directory that holds the components. func FindComponentsDir(dir, version string) (string, error) { versionDir, err := findAgentDataVersionDir(dir, version) if err != nil { return "", err } componentsDir := filepath.Join(versionDir, "components") fi, err := os.Stat(componentsDir) if (err != nil && !os.IsExist(err)) || !fi.IsDir() { return "", fmt.Errorf("failed to find components directory at %s: %w", componentsDir, err) } return componentsDir, nil } // FindRunDir identifies the directory that holds the run folder. func FindRunDir(fixture *Fixture) (string, error) { agentWorkDir := fixture.WorkDir() if pf := fixture.PackageFormat(); pf == "deb" || pf == "rpm" { // these are hardcoded paths in packages.yml agentWorkDir = "/var/lib/elastic-agent" } version := fixture.Version() versionDir, err := findAgentDataVersionDir(agentWorkDir, version) if err != nil { return "", err } runDir := filepath.Join(versionDir, "run") fi, err := os.Stat(runDir) if (err != nil && !os.IsExist(err)) || !fi.IsDir() { return "", fmt.Errorf("failed to find run directory at %s: %w", runDir, err) } return runDir, nil } // writeSpecFile writes the specification to a specification file at the defined destination. func writeSpecFile(dest string, spec *component.Spec) error { data, err := yaml.Marshal(spec) if err != nil { return err } specWriter, err := os.OpenFile(dest, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644) if err != nil { return err } defer specWriter.Close() _, err = specWriter.Write(data) if err != nil { return err } return nil } // attachOutErr attaches the logWatcher to std out and std error of the spawned process. func attachOutErr(stdOut *logWatcher, stdErr *logWatcher) process.CmdOption { return func(cmd *exec.Cmd) error { cmd.Stdout = stdOut cmd.Stderr = stdErr return nil } } func watchState(ctx context.Context, t *testing.T, c client.Client, timeout time.Duration) (chan *client.AgentState, chan error) { stateCh := make(chan *client.AgentState) errCh := make(chan error) go func() { err := c.Connect(ctx) if err != nil { errCh <- fmt.Errorf("Connect() failed: %w", err) return } defer c.Disconnect() // StateWatch will return an error if the client is not fully connected // we retry this in a loop based on the timeout to ensure that we can // get a valid StateWatch connection var sub client.ClientStateWatch expBackoff := backoff.NewExponentialBackOff() expBackoff.InitialInterval = 100 * time.Millisecond expBackoff.MaxElapsedTime = timeout expBackoff.MaxInterval = 2 * time.Second err = backoff.RetryNotify( func() error { var err error sub, err = c.StateWatch(ctx) return err }, backoff.WithContext(expBackoff, ctx), func(err error, retryAfter time.Duration) { t.Logf("%s: StateWatch failed: %s retrying: %s", time.Now().UTC().Format(time.RFC3339Nano), err.Error(), retryAfter) }, ) if err != nil { errCh <- fmt.Errorf("StateWatch() failed: %w", err) return } t.Logf("%s: StateWatch started", time.Now().UTC().Format(time.RFC3339Nano)) for { recv, err := sub.Recv() if err != nil { errCh <- fmt.Errorf("Recv() failed: %w", err) return } stateCh <- recv } }() return stateCh, errCh } func performConfigure(ctx context.Context, c client.Client, cfg string, timeout time.Duration) error { ctx, cancel := context.WithTimeout(ctx, timeout) defer cancel() err := c.Configure(ctx, cfg) if err != nil { return fmt.Errorf("state management failed update configuration: %w", err) } return nil } // createTempDir creates a temporary directory that will be // removed after the tests passes. If the test fails, the // directory is kept for further investigation. // // If the test is run with -v and fails the temporary directory is logged func createTempDir(t *testing.T) string { tempDir, err := os.MkdirTemp("", strings.ReplaceAll(t.Name(), "/", "-")) if err != nil { t.Fatalf("failed to make temp directory: %s", err) } cleanup := func() { if !t.Failed() { if err := install.RemovePath(tempDir); err != nil { t.Errorf("could not remove temp dir '%s': %s", tempDir, err) } } else { t.Logf("Temporary directory %q preserved for investigation/debugging", tempDir) } } t.Cleanup(cleanup) return tempDir } type AgentStatusCollectorOutput struct { Status int `json:"status"` Error string `json:"error"` Timestamp string `json:"timestamp"` ComponentStatusMap map[string]*AgentStatusCollectorOutput `json:"components"` } type AgentStatusOutput struct { Info struct { ID string `json:"id"` Version string `json:"version"` Commit string `json:"commit"` BuildTime string `json:"build_time"` Snapshot bool `json:"snapshot"` PID int32 `json:"pid"` Unprivileged bool `json:"unprivileged"` } `json:"info"` State int `json:"state"` Message string `json:"message"` Components []struct { ID string `json:"id"` Name string `json:"name"` State int `json:"state"` Message string `json:"message"` Units []struct { UnitID string `json:"unit_id"` UnitType int `json:"unit_type"` State int `json:"state"` Message string `json:"message"` Payload struct { OsqueryVersion string `json:"osquery_version"` } `json:"payload"` } `json:"units"` VersionInfo struct { Name string `json:"name"` Version string `json:"version"` Meta struct { BuildTime string `json:"build_time"` Commit string `json:"commit"` } `json:"meta"` } `json:"version_info,omitempty"` } `json:"components"` Collector *AgentStatusCollectorOutput `json:"collector"` FleetState int `json:"FleetState"` FleetMessage string `json:"FleetMessage"` UpgradeDetails *details.Details `json:"upgrade_details"` } func (aso *AgentStatusOutput) IsZero() bool { return aso.Info.ID == "" && aso.Message == "" && aso.Info.Version == "" } type AgentInspectOutput struct { Agent struct { Download struct { SourceURI string `yaml:"sourceURI"` } `yaml:"download"` Features interface{} `yaml:"features"` Headers interface{} `yaml:"headers"` ID string `yaml:"id"` Logging struct { Level string `yaml:"level"` } `yaml:"logging"` Monitoring struct { Enabled bool `yaml:"enabled"` HTTP struct { Buffer interface{} `yaml:"buffer"` Enabled bool `yaml:"enabled"` Host string `yaml:"host"` Port int `yaml:"port"` } `yaml:"http"` Logs bool `yaml:"logs"` Metrics bool `yaml:"metrics"` Namespace string `yaml:"namespace"` UseOutput string `yaml:"use_output"` } `yaml:"monitoring"` Protection struct { Enabled bool `yaml:"enabled"` SigningKey string `yaml:"signing_key"` UninstallTokenHash string `yaml:"uninstall_token_hash"` } `yaml:"protection"` } `yaml:"agent"` Fleet struct { AccessAPIKey string `yaml:"access_api_key"` Agent struct { ID string `yaml:"id"` } `yaml:"agent"` Enabled bool `yaml:"enabled"` Host string `yaml:"host"` Hosts []string `yaml:"hosts"` Protocol string `yaml:"protocol"` ProxyURL string `yaml:"proxy_url"` Reporting struct { CheckFrequencySec int `yaml:"check_frequency_sec"` Threshold int `yaml:"threshold"` } `yaml:"reporting"` Ssl struct { Renegotiation string `yaml:"renegotiation"` VerificationMode string `yaml:"verification_mode"` Certificate string `yaml:"certificate"` CertificateAuthorities []string `yaml:"certificate_authorities"` Key string `yaml:"key"` KeyPassphrasePath string `yaml:"key_passphrase_path"` } `yaml:"ssl"` Timeout string `yaml:"timeout"` } `yaml:"fleet"` Host struct { ID string `yaml:"id"` } `yaml:"host"` ID string `yaml:"id"` Inputs interface{} `yaml:"inputs"` Outputs struct { Default struct { APIKey string `yaml:"api_key"` Hosts []string `yaml:"hosts"` Type string `yaml:"type"` } `yaml:"default"` } `yaml:"outputs"` Path struct { Config string `yaml:"config"` Data string `yaml:"data"` Home string `yaml:"home"` Logs string `yaml:"logs"` } `yaml:"path"` Revision int `yaml:"revision"` Runtime struct { Arch string `yaml:"arch"` Os string `yaml:"os"` Osinfo struct { Family string `yaml:"family"` Major int `yaml:"major"` Minor int `yaml:"minor"` Patch int `yaml:"patch"` Type string `yaml:"type"` Version string `yaml:"version"` } `yaml:"osinfo"` } `yaml:"runtime"` Signed struct { Data string `yaml:"data"` } `yaml:"signed"` } type AgentBinaryVersion struct { Version string `yaml:"version"` Commit string `yaml:"commit"` BuildTime string `yaml:"build_time"` Snapshot bool `yaml:"snapshot"` } // String returns the version string. func (v *AgentBinaryVersion) String() string { s := v.Version if v.Snapshot { s += "-SNAPSHOT" } return s } type AgentVersionOutput struct { Binary AgentBinaryVersion `yaml:"binary"` } func sleepFor(ctx context.Context, amount time.Duration) { select { case <-ctx.Done(): case <-time.After(amount): } }