func runCmd()

in x-pack/heartbeat/monitors/browser/synthexec/synthexec.go [125:300]


func runCmd(
	ctx context.Context,
	cmd *SynthCmd,
	stdinStr *string,
	params mapstr.M,
	filterJourneys FilterJourneyConfig,
) (mpx *ExecMultiplexer, err error) {
	// Attach sysproc attrs to ensure subprocesses are properly killed
	platformCmdMutate(cmd)

	mpx = NewExecMultiplexer()
	// Setup a pipe for JSON structured output
	jsonReader, jsonWriter, err := os.Pipe()
	if err != nil {
		return nil, err
	}

	// Common args
	cmd.Env = append(os.Environ(), "NODE_ENV=production")
	cmd.Args = append(cmd.Args, "--rich-events")

	if len(filterJourneys.Tags) > 0 {
		cmd.Args = append(cmd.Args, "--tags", strings.Join(filterJourneys.Tags, " "))
	}

	if filterJourneys.Match != "" {
		cmd.Args = append(cmd.Args, "--match", filterJourneys.Match)
	}

	if len(params) > 0 {
		paramsBytes, _ := json.Marshal(params)
		cmd.Args = append(cmd.Args, "--params", string(paramsBytes))
	}

	// We need to pass both files in here otherwise we get a broken pipe, even
	// though node only touches the writer
	cmd.ExtraFiles = []*os.File{jsonWriter, jsonReader}
	// Out fd is always 3 since it's the only FD passed into cmd.ExtraFiles
	// see the docs for ExtraFiles in https://golang.org/pkg/os/exec/#Cmd
	cmd.Args = append(cmd.Args, "--outfd", "3")

	logp.L().Info("Running command: %s in directory: '%s'", cmd, cmd.Dir)

	if stdinStr != nil {
		logp.L().Debug(debugSelector, "Using stdin str %s", *stdinStr)
		cmd.Stdin = strings.NewReader(*stdinStr)
	}

	wg := sync.WaitGroup{}

	// Send stdout into the output
	stdoutPipe, err := cmd.StdoutPipe()
	if err != nil {
		return nil, fmt.Errorf("could not open stdout pipe: %w", err)
	}
	wg.Add(1)
	go func() {
		err := scanToSynthEvents(stdoutPipe, stdoutToSynthEvent, mpx.writeSynthEvent)
		if err != nil {
			logp.L().Warn("could not scan stdout events from synthetics: %s", err)
		}

		wg.Done()
	}()

	stderrPipe, err := cmd.StderrPipe()
	if err != nil {
		return nil, fmt.Errorf("could not open stderr pipe: %w", err)
	}
	wg.Add(1)
	go func() {
		err := scanToSynthEvents(stderrPipe, stderrToSynthEvent, mpx.writeSynthEvent)
		if err != nil {
			logp.L().Warn("could not scan stderr events from synthetics: %s", err)
		}
		wg.Done()
	}()

	// Send the test results into the output
	wg.Add(1)
	go func() {
		defer jsonReader.Close()

		// We don't use scanToSynthEvents here because all lines here will be JSON
		// It's more efficient to let the json decoder handle the ndjson than
		// using the scanner
		decoder := json.NewDecoder(jsonReader)
		for {
			var se SynthEvent
			err := decoder.Decode(&se)
			if errors.Is(err, io.EOF) {
				break
			}
			if err != nil {
				logp.L().Warnf("error decoding json for test json results: %w", err)
			}

			mpx.writeSynthEvent(&se)
		}

		wg.Done()
	}()

	// This use of channels for results is awkward, but required for the thread locking below
	cmdStarted := make(chan error)
	cmdDone := make(chan error)
	go func() {
		// We must idle this thread and ensure it is not killed while the external program is running
		// see https://github.com/golang/go/issues/27505#issuecomment-713706104 . Otherwise, the Pdeathsig
		// could cause the subprocess to die prematurely
		runtime.LockOSThread()
		defer runtime.UnlockOSThread()
		err = cmd.Start()

		cmdStarted <- err

		err := cmd.Wait()
		cmdDone <- err
	}()

	err = <-cmdStarted
	if err != nil {
		logp.L().Warn("Could not start command %s: %s", cmd, err)
		return nil, err
	}

	// Get timeout from parent ctx
	timeout, _ := ctx.Value(SynthexecTimeout).(time.Duration)
	ctx, cancel := context.WithTimeout(ctx, timeout)
	go func() {
		<-ctx.Done()

		// ProcessState can be null if it hasn't reported back yet
		if cmd.ProcessState != nil && cmd.ProcessState.Exited() {
			return
		}

		err := cmd.Process.Kill()
		if err != nil {
			logp.L().Warn("could not kill synthetics process: %s", err)
		}
	}()

	// Close mpx after the process is done and all events have been sent / consumed
	go func() {
		err := <-cmdDone
		_ = jsonWriter.Close()
		logp.L().Info("Command has completed(%d): %s", cmd.ProcessState.ExitCode(), cmd)

		var cmdError *SynthError = nil
		if err != nil {
			// err could be generic or it could have been killed by context timeout, log and check context
			// to decide which error to stream
			logp.L().Warn("Error executing command '%s' (%d): %s", cmd, cmd.ProcessState.ExitCode(), err)

			if errors.Is(ctx.Err(), context.DeadlineExceeded) {
				timeout, _ := ctx.Value(SynthexecTimeout).(time.Duration)
				cmdError = ECSErrToSynthError(ecserr.NewCmdTimeoutStatusErr(timeout, cmd.String()))
			} else {
				cmdError = ECSErrToSynthError(ecserr.NewBadCmdStatusErr(cmd.ProcessState.ExitCode(), cmd.String()))
			}
		}

		mpx.writeSynthEvent(&SynthEvent{
			Type:                 CmdStatus,
			Error:                cmdError,
			TimestampEpochMicros: float64(time.Now().UnixMicro()),
		})

		wg.Wait()
		mpx.Close()
		cancel()
	}()

	return mpx, nil
}