func()

in pkg/testing/fixture.go [500:645]


func (f *Fixture) executeWithClient(ctx context.Context, command string, disableEncryptedStore bool, shouldWatchState bool, enableTestingMode bool, states ...State) error {
	if _, deadlineSet := ctx.Deadline(); !deadlineSet {
		f.t.Fatal("Context passed to Fixture.Run() has no deadline set.")
	}

	if f.binaryName != "elastic-agent" {
		return errors.New("Run() can only be used with elastic-agent, use RunBeat()")
	}
	if f.installed {
		return errors.New("fixture is installed; cannot be run")
	}

	var err error
	err = f.EnsurePrepared(ctx)
	if err != nil {
		return err
	}

	ctx, cancel := context.WithCancel(ctx)
	defer cancel()

	var smInstance *stateMachine
	if states != nil {
		smInstance, err = newStateMachine(states)
		if err != nil {
			return err
		}
	}

	// agent-specific setup
	var agentClient client.Client
	var stateCh chan *client.AgentState
	var stateErrCh chan error

	cAddr, err := control.AddressFromPath(f.operatingSystem, f.workDir)
	if err != nil {
		return fmt.Errorf("failed to get control protcol address: %w", err)
	}

	var logProxy Logger
	if f.logOutput {
		logProxy = f.t
	}
	stdOut := newLogWatcher(logProxy)
	stdErr := newLogWatcher(logProxy)

	args := []string{command, "-e"}
	if disableEncryptedStore {
		args = append(args, "--disable-encrypted-store")
	}
	if enableTestingMode {
		args = append(args, "--testing-mode")
	}

	args = append(args, f.additionalArgs...)

	proc, err := process.Start(
		f.binaryPath(),
		process.WithContext(ctx),
		process.WithArgs(args),
		process.WithCmdOptions(attachOutErr(stdOut, stdErr)))

	if err != nil {
		return fmt.Errorf("failed to spawn %s: %w", f.binaryName, err)
	}

	if shouldWatchState {
		agentClient = client.New(client.WithAddress(cAddr))
		f.setClient(agentClient)
		defer f.setClient(nil)
		stateCh, stateErrCh = watchState(ctx, f.t, agentClient, f.connectTimout)
	}

	var doneChan <-chan time.Time
	if f.runLength != 0 {
		doneChan = time.After(f.runLength)
	}

	procWaitCh := proc.Wait()
	killProc := func() {
		_ = proc.Kill()
		<-procWaitCh
	}

	stopping := false
	for {
		select {
		case <-ctx.Done():
			killProc()
			return ctx.Err()
		case ps := <-procWaitCh:
			if stopping {
				return nil
			}
			return fmt.Errorf("elastic-agent exited unexpectedly with exit code: %d", ps.ExitCode())
		case err := <-stdOut.Watch():
			if !f.allowErrs {
				// no errors allowed
				killProc()
				return fmt.Errorf("elastic-agent logged an unexpected error: %w", err)
			}
		case err := <-stdErr.Watch():
			if !f.allowErrs {
				// no errors allowed
				killProc()
				return fmt.Errorf("elastic-agent logged an unexpected error: %w", err)
			}
		case err := <-stateErrCh:
			if !stopping {
				// Give the log watchers a second to write out the agent logs.
				// Client connnection failures can happen quickly enough to prevent logging.
				time.Sleep(time.Second)
				// connection to elastic-agent failed
				killProc()
				return fmt.Errorf("elastic-agent client received unexpected error: %w", err)
			}
		case <-doneChan:
			if !stopping {
				// trigger the stop
				stopping = true
				_ = proc.Stop()
			}
		case state := <-stateCh:
			if smInstance != nil {
				cfg, cont, err := smInstance.next(ctx, state)
				if err != nil {
					killProc()
					return fmt.Errorf("state management failed with unexpected error: %w", err)
				}
				if !cont {
					if !stopping {
						// trigger the stop
						stopping = true
						_ = proc.Stop()
					}
				} else if cfg != "" {
					err := performConfigure(ctx, agentClient, cfg, 3*time.Second)
					if err != nil {
						killProc()
						return err
					}
				}
			}
		}
	}
}