in x-pack/heartbeat/monitors/browser/synthexec/synthexec.go [125:300]
func runCmd(
ctx context.Context,
cmd *SynthCmd,
stdinStr *string,
params mapstr.M,
filterJourneys FilterJourneyConfig,
) (mpx *ExecMultiplexer, err error) {
// Attach sysproc attrs to ensure subprocesses are properly killed
platformCmdMutate(cmd)
mpx = NewExecMultiplexer()
// Setup a pipe for JSON structured output
jsonReader, jsonWriter, err := os.Pipe()
if err != nil {
return nil, err
}
// Common args
cmd.Env = append(os.Environ(), "NODE_ENV=production")
cmd.Args = append(cmd.Args, "--rich-events")
if len(filterJourneys.Tags) > 0 {
cmd.Args = append(cmd.Args, "--tags", strings.Join(filterJourneys.Tags, " "))
}
if filterJourneys.Match != "" {
cmd.Args = append(cmd.Args, "--match", filterJourneys.Match)
}
if len(params) > 0 {
paramsBytes, _ := json.Marshal(params)
cmd.Args = append(cmd.Args, "--params", string(paramsBytes))
}
// We need to pass both files in here otherwise we get a broken pipe, even
// though node only touches the writer
cmd.ExtraFiles = []*os.File{jsonWriter, jsonReader}
// Out fd is always 3 since it's the only FD passed into cmd.ExtraFiles
// see the docs for ExtraFiles in https://golang.org/pkg/os/exec/#Cmd
cmd.Args = append(cmd.Args, "--outfd", "3")
logp.L().Info("Running command: %s in directory: '%s'", cmd, cmd.Dir)
if stdinStr != nil {
logp.L().Debug(debugSelector, "Using stdin str %s", *stdinStr)
cmd.Stdin = strings.NewReader(*stdinStr)
}
wg := sync.WaitGroup{}
// Send stdout into the output
stdoutPipe, err := cmd.StdoutPipe()
if err != nil {
return nil, fmt.Errorf("could not open stdout pipe: %w", err)
}
wg.Add(1)
go func() {
err := scanToSynthEvents(stdoutPipe, stdoutToSynthEvent, mpx.writeSynthEvent)
if err != nil {
logp.L().Warn("could not scan stdout events from synthetics: %s", err)
}
wg.Done()
}()
stderrPipe, err := cmd.StderrPipe()
if err != nil {
return nil, fmt.Errorf("could not open stderr pipe: %w", err)
}
wg.Add(1)
go func() {
err := scanToSynthEvents(stderrPipe, stderrToSynthEvent, mpx.writeSynthEvent)
if err != nil {
logp.L().Warn("could not scan stderr events from synthetics: %s", err)
}
wg.Done()
}()
// Send the test results into the output
wg.Add(1)
go func() {
defer jsonReader.Close()
// We don't use scanToSynthEvents here because all lines here will be JSON
// It's more efficient to let the json decoder handle the ndjson than
// using the scanner
decoder := json.NewDecoder(jsonReader)
for {
var se SynthEvent
err := decoder.Decode(&se)
if errors.Is(err, io.EOF) {
break
}
if err != nil {
logp.L().Warnf("error decoding json for test json results: %w", err)
}
mpx.writeSynthEvent(&se)
}
wg.Done()
}()
// This use of channels for results is awkward, but required for the thread locking below
cmdStarted := make(chan error)
cmdDone := make(chan error)
go func() {
// We must idle this thread and ensure it is not killed while the external program is running
// see https://github.com/golang/go/issues/27505#issuecomment-713706104 . Otherwise, the Pdeathsig
// could cause the subprocess to die prematurely
runtime.LockOSThread()
defer runtime.UnlockOSThread()
err = cmd.Start()
cmdStarted <- err
err := cmd.Wait()
cmdDone <- err
}()
err = <-cmdStarted
if err != nil {
logp.L().Warn("Could not start command %s: %s", cmd, err)
return nil, err
}
// Get timeout from parent ctx
timeout, _ := ctx.Value(SynthexecTimeout).(time.Duration)
ctx, cancel := context.WithTimeout(ctx, timeout)
go func() {
<-ctx.Done()
// ProcessState can be null if it hasn't reported back yet
if cmd.ProcessState != nil && cmd.ProcessState.Exited() {
return
}
err := cmd.Process.Kill()
if err != nil {
logp.L().Warn("could not kill synthetics process: %s", err)
}
}()
// Close mpx after the process is done and all events have been sent / consumed
go func() {
err := <-cmdDone
_ = jsonWriter.Close()
logp.L().Info("Command has completed(%d): %s", cmd.ProcessState.ExitCode(), cmd)
var cmdError *SynthError = nil
if err != nil {
// err could be generic or it could have been killed by context timeout, log and check context
// to decide which error to stream
logp.L().Warn("Error executing command '%s' (%d): %s", cmd, cmd.ProcessState.ExitCode(), err)
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
timeout, _ := ctx.Value(SynthexecTimeout).(time.Duration)
cmdError = ECSErrToSynthError(ecserr.NewCmdTimeoutStatusErr(timeout, cmd.String()))
} else {
cmdError = ECSErrToSynthError(ecserr.NewBadCmdStatusErr(cmd.ProcessState.ExitCode(), cmd.String()))
}
}
mpx.writeSynthEvent(&SynthEvent{
Type: CmdStatus,
Error: cmdError,
TimestampEpochMicros: float64(time.Now().UnixMicro()),
})
wg.Wait()
mpx.Close()
cancel()
}()
return mpx, nil
}