in pkg/testing/fixture.go [500:645]
func (f *Fixture) executeWithClient(ctx context.Context, command string, disableEncryptedStore bool, shouldWatchState bool, enableTestingMode bool, states ...State) error {
if _, deadlineSet := ctx.Deadline(); !deadlineSet {
f.t.Fatal("Context passed to Fixture.Run() has no deadline set.")
}
if f.binaryName != "elastic-agent" {
return errors.New("Run() can only be used with elastic-agent, use RunBeat()")
}
if f.installed {
return errors.New("fixture is installed; cannot be run")
}
var err error
err = f.EnsurePrepared(ctx)
if err != nil {
return err
}
ctx, cancel := context.WithCancel(ctx)
defer cancel()
var smInstance *stateMachine
if states != nil {
smInstance, err = newStateMachine(states)
if err != nil {
return err
}
}
// agent-specific setup
var agentClient client.Client
var stateCh chan *client.AgentState
var stateErrCh chan error
cAddr, err := control.AddressFromPath(f.operatingSystem, f.workDir)
if err != nil {
return fmt.Errorf("failed to get control protcol address: %w", err)
}
var logProxy Logger
if f.logOutput {
logProxy = f.t
}
stdOut := newLogWatcher(logProxy)
stdErr := newLogWatcher(logProxy)
args := []string{command, "-e"}
if disableEncryptedStore {
args = append(args, "--disable-encrypted-store")
}
if enableTestingMode {
args = append(args, "--testing-mode")
}
args = append(args, f.additionalArgs...)
proc, err := process.Start(
f.binaryPath(),
process.WithContext(ctx),
process.WithArgs(args),
process.WithCmdOptions(attachOutErr(stdOut, stdErr)))
if err != nil {
return fmt.Errorf("failed to spawn %s: %w", f.binaryName, err)
}
if shouldWatchState {
agentClient = client.New(client.WithAddress(cAddr))
f.setClient(agentClient)
defer f.setClient(nil)
stateCh, stateErrCh = watchState(ctx, f.t, agentClient, f.connectTimout)
}
var doneChan <-chan time.Time
if f.runLength != 0 {
doneChan = time.After(f.runLength)
}
procWaitCh := proc.Wait()
killProc := func() {
_ = proc.Kill()
<-procWaitCh
}
stopping := false
for {
select {
case <-ctx.Done():
killProc()
return ctx.Err()
case ps := <-procWaitCh:
if stopping {
return nil
}
return fmt.Errorf("elastic-agent exited unexpectedly with exit code: %d", ps.ExitCode())
case err := <-stdOut.Watch():
if !f.allowErrs {
// no errors allowed
killProc()
return fmt.Errorf("elastic-agent logged an unexpected error: %w", err)
}
case err := <-stdErr.Watch():
if !f.allowErrs {
// no errors allowed
killProc()
return fmt.Errorf("elastic-agent logged an unexpected error: %w", err)
}
case err := <-stateErrCh:
if !stopping {
// Give the log watchers a second to write out the agent logs.
// Client connnection failures can happen quickly enough to prevent logging.
time.Sleep(time.Second)
// connection to elastic-agent failed
killProc()
return fmt.Errorf("elastic-agent client received unexpected error: %w", err)
}
case <-doneChan:
if !stopping {
// trigger the stop
stopping = true
_ = proc.Stop()
}
case state := <-stateCh:
if smInstance != nil {
cfg, cont, err := smInstance.next(ctx, state)
if err != nil {
killProc()
return fmt.Errorf("state management failed with unexpected error: %w", err)
}
if !cont {
if !stopping {
// trigger the stop
stopping = true
_ = proc.Stop()
}
} else if cfg != "" {
err := performConfigure(ctx, agentClient, cfg, 3*time.Second)
if err != nil {
killProc()
return err
}
}
}
}
}
}