func()

in agent/commandermanager/commander.go [207:317]


func (c *Commander) startProcess(token string) (processErr *taskerrors.CommanderError) {
	c.logger.Info("Try to start commander process")

	endpoint := buses.GetCentralEndpoint(false)
	args := []string{"run-server", "--pidfile", c.config.PidFile, "--endpoint", c.config.Endpoint.String()}
	cmd := exec.Command(c.config.CmdPath, args...)
	cmd.Env = append(cmd.Env, os.Environ()...)
	cmd.Env = append(cmd.Env, fmt.Sprintf("AXT_AGENT_MESSAGEBUS=%s", endpoint.String()))

	c.handshakeToken = token
	c.handshakeDoneLock.Lock()
	c.handshakeDone = make(chan error, 1)
	c.handshakeDoneLock.Unlock()
	cmd.Env = append(cmd.Env, fmt.Sprintf("AXT_HANDSHAKE_TOKEN=%s", c.handshakeToken))
	defer func() {
		c.handshakeToken = ""
		close(c.handshakeDone)
		c.handshakeDoneLock.Lock()
		defer c.handshakeDoneLock.Unlock()
		c.handshakeDone = nil
	}()
	c.logger.Info(strings.Join(cmd.Args, " "))

	// Agent take over the stdout and stderr of commander process, so if agent
	// process stops commander process will receive SIGPIPE in linux. So
	// commander process in linux need to handle SIGPIPE to prevent unexpected exit.
	cmdStdout, err := cmd.StdoutPipe()
	if err != nil {
		return taskerrors.NewStartProcessFailedError(fmt.Sprintf("create stdout pipe failed, %v", err))
	}
	cmdStderr, err := cmd.StderrPipe()
	if err != nil {
		return taskerrors.NewStartProcessFailedError(fmt.Sprintf("create stderr pipe failed, %v", err))
	}
	if err = cmd.Start(); err != nil {
		return taskerrors.NewStartProcessFailedError(fmt.Sprintf("start process failed, %v", err))
	}

	c.logger = c.logger.WithField("commander-pid", cmd.Process.Pid)

	// Make sure the command is properly cleaned up if there is an error
	defer func() {
		if r := recover(); r != nil {
			processErr = taskerrors.NewStartProcessFailedError(fmt.Sprintf("panic occurred, %v", err))
			cmdStdout.Close()
			cmdStderr.Close()
		}
	}()

	var stdouterrWaitGroup sync.WaitGroup
	// wait stdout/stderr read goroutine
	stdouterrWaitGroup.Add(1)
	stdouterrWaitGroup.Add(1)

	exitCtx, ctxCancel := context.WithCancel(context.Background())
	go func() {
		// wait to finish reading from stdout/stderr since the stdout/stderr
		// pipe reader will be closed by the subsequent call to cmd.Wait().
		stdouterrWaitGroup.Wait()

		// Wait for the command to end.
		err := cmd.Wait()
		if err != nil {
			c.logger.WithError(err).Error("commander process exited")
		} else {
			c.logger.Info("commander process exited")
		}

		ctxCancel()
		c.onExit()
	}()

	timeout := time.After(c.config.StartTimeout)
	stdoutScanner := bufio.NewScanner(cmdStdout)
	stderrScanner := bufio.NewScanner(cmdStderr)
	c.logger.Info("Waiting for connection")
	go func() {
		defer func() {
			stdouterrWaitGroup.Done()
			cmdStdout.Close()
		}()
		logger := c.logger.WithField("from", "stdout")
		for stdoutScanner.Scan() {
			logger.Info(stdoutScanner.Text())
		}
	}()
	go func() {
		defer func() {
			stdouterrWaitGroup.Done()
			cmdStderr.Close()
		}()
		logger := c.logger.WithField("from", "stderr")
		for stderrScanner.Scan() {
			logger.Info(stderrScanner.Text())
		}
	}()

	select {
	case err = <-c.handshakeDone:
		if err != nil {
			processErr = taskerrors.NewStartProcessFailedError(fmt.Sprintf("handshake failed, %v", err))
		}
	case <-exitCtx.Done():
		processErr = taskerrors.NewStartProcessFailedError("process exited before handshake finishe")
	case <-timeout:
		processErr = taskerrors.NewStartProcessFailedError("timeout while commander start")
		cmd.Process.Kill()
	}

	return processErr
}