func()

in plugin/commander/container/docker/processor.go [79:171]


func (p *DockerProcessor) SyncRun(
	stdoutWriter io.Writer,
	stderrWriter io.Writer,
	stdinReader io.Reader) (int, int, error) {
	// 1. Create an exec instance
	compiledCommand := []string{"/bin/sh", "-c", p.CommandContent}
	execConfig := types.ExecConfig{
		User:         p.Username,
		Tty:          false,
		AttachStdin:  stdinReader != nil,
		AttachStderr: stderrWriter != nil,
		AttachStdout: stdoutWriter != nil,
		WorkingDir:   p.WorkingDirectory,
		Cmd:          compiledCommand,
	}
	execution, err := createExec(p.client, p.container.ID, execConfig, defaultTimeout)
	if err != nil {
		return 1, process.Fail, taskerrors.NewContainerRuntimeInternalError(err)
	}

	// 2. Start the created exec instance and get a hijacked response stream
	execStartConfig := types.ExecStartCheck{
		Detach: false,
		Tty:    false,
	}
	hijackedResponse, err := startAndAttachExec(p.client, execution.ID, execStartConfig, defaultTimeout)
	if err != nil {
		return 1, process.Fail, taskerrors.NewContainerRuntimeInternalError(err)
	}
	defer hijackedResponse.Close()

	ctx, cancel := context.WithTimeout(context.Background(), time.Duration(p.Timeout)*time.Second)
	defer cancel()
	p.cancel = cancel

	// Run streamer for stdin/stdout/stderr on hijacked connection concurrently,
	// and catch session termination through channel
	streamed := make(chan error, 1)
	go func() {
		streamed <- streamHijacked(ctx, hijackedResponse, stdoutWriter, stderrWriter, stdinReader)
	}()

	// Wait for command finished, or timeout
	select {
	case <-ctx.Done():
		if ctxErr := ctx.Err(); ctxErr != nil {
			if ctxErr == context.DeadlineExceeded {
				return 1, process.Timeout, errors.New("timeout")
			}
		}
	case err := <-streamed:
		if err != nil {
			return 1, process.Fail, taskerrors.NewContainerRuntimeInternalError(err)
		}
	}

	// Determine process state after exec session terminated.
	// As https://github.com/Mirantis/cri-dockerd/blob/17229a014b98b47966f98a16d4dd9faa5230a31f/core/exec.go#L153-L154
	// says, try to inspect an exec session a few times for the newest state.
	var finalErr error
	var exitCode int
	ticker := time.NewTicker(2 * time.Second)
	defer ticker.Stop()
	for retries := 0; ; {
		inspection, err := inspectExec(p.client, execution.ID, defaultTimeout)
		if err != nil {
			finalErr = err
			break
		}

		if !inspection.Running {
			exitCode = inspection.ExitCode
			break
		}

		retries++
		if retries == maxInspectionRetries {
			log.GetLogger().WithFields(logrus.Fields{
				"containerId":   p.ContainerId,
				"containerName": p.ContainerName,
				"execId":        execution.ID,
			}).WithError(ErrInconsistentExecProcessState).Errorln("Failed to conclude process state after exec session")
			return 1, process.Fail, taskerrors.NewContainerRuntimeInternalError(ErrInconsistentExecProcessState)
		}

		<-ticker.C
	}
	if finalErr != nil {
		return 1, process.Fail, taskerrors.NewContainerRuntimeInternalError(finalErr)
	}

	return exitCode, process.Success, nil
}