func()

in clusterloader2/pkg/measurement/common/pod_command.go [296:408]


func (p *podPeriodicCommandMeasurement) runCommandInPod(
	pod *v1.Pod, params *podPeriodicCommandMeasurementCommandParams,
) *runCommandResult {
	klog.V(4).Infof(
		"%s: running named command %s in pod %s/%s",
		podPeriodicCommandMeasurementName, params.Name, pod.Namespace, pod.Name,
	)

	p.statsLock.Lock()
	p.stats.Execs++
	p.statsLock.Unlock()

	result := &runCommandResult{
		Name:       params.Name,
		Command:    params.Command,
		Timeout:    params.Timeout.String(),
		ExitCode:   0,
		HitTimeout: false,
	}

	req := p.clientset.CoreV1().RESTClient().
		Post().
		Namespace(pod.Namespace).
		Resource("pods").
		Name(pod.Name).
		SubResource("exec").
		VersionedParams(&v1.PodExecOptions{
			Container: p.params.Container,
			Command:   params.Command,
			Stdin:     false,
			Stdout:    true,
			Stderr:    true,
			TTY:       false,
		}, scheme.ParameterCodec)

	executor, err := remotecommand.NewSPDYExecutor(p.restConfig, "POST", req.URL())
	if err != nil {
		result.ExecError = err.Error()

		return result
	}

	stdoutBuf := &bytes.Buffer{}
	stderrBuf := &bytes.Buffer{}
	// Holds error returned from executor.Stream.
	execErrChan := make(chan error, 1)

	// The logic used here to start the executor and the timeout timer isn't super precise, but
	// it is good enough for this use case. It is ok that the timeout timer is started after the
	// executor, since we still guarantee that the timeout is at least the configured value.
	result.StartTime = time.Now()

	go func() {
		err := executor.Stream(remotecommand.StreamOptions{
			Stdout: stdoutBuf,
			Stderr: stderrBuf,
		})
		execErrChan <- err
	}()

	// Two different cases: (1) if the command returns before the timeout, and (2) if the timeout
	// triggers before the command is done.
	// The value result.EndTime is set in both cases.
	// If the timeout triggers, then the command isn't actually cancelled. This logic isn't available until
	// client-go version 0.26 (see Executor.StreamWithContext).
	select {
	case err = <-execErrChan:
		result.EndTime = time.Now()

		if err == nil {
			break
		}

		switch e := err.(type) {
		case exec.CodeExitError:
			result.ExitCode = e.ExitStatus()

			p.statsLock.Lock()
			p.stats.NonZeroRCs++
			p.statsLock.Unlock()

			klog.V(2).Infof(
				"%s: warning: non-zero exit code %d for named command %s in pod %s/%s",
				podPeriodicCommandMeasurementName, result.ExitCode, params.Name, pod.Namespace, pod.Name,
			)
		default:
			result.ExecError = err.Error()
			return result
		}
	case <-time.After(params.Timeout):
		result.EndTime = time.Now()
		result.HitTimeout = true

		p.statsLock.Lock()
		p.stats.Timeouts++
		p.statsLock.Unlock()

		klog.V(2).Infof(
			"%s: warning: hit timeout of %s for named command %s in pod %s/%s",
			podPeriodicCommandMeasurementName, params.Timeout.String(), params.Name, pod.Namespace, pod.Name,
		)
	}

	klog.V(4).Infof(
		"%s: finished running named command %s in pod %s/%s",
		podPeriodicCommandMeasurementName, params.Name, pod.Namespace, pod.Name,
	)

	result.stdout = stdoutBuf.String()
	result.stderr = stderrBuf.String()

	return result
}