in agent/commandermanager/commander.go [207:317]
func (c *Commander) startProcess(token string) (processErr *taskerrors.CommanderError) {
c.logger.Info("Try to start commander process")
endpoint := buses.GetCentralEndpoint(false)
args := []string{"run-server", "--pidfile", c.config.PidFile, "--endpoint", c.config.Endpoint.String()}
cmd := exec.Command(c.config.CmdPath, args...)
cmd.Env = append(cmd.Env, os.Environ()...)
cmd.Env = append(cmd.Env, fmt.Sprintf("AXT_AGENT_MESSAGEBUS=%s", endpoint.String()))
c.handshakeToken = token
c.handshakeDoneLock.Lock()
c.handshakeDone = make(chan error, 1)
c.handshakeDoneLock.Unlock()
cmd.Env = append(cmd.Env, fmt.Sprintf("AXT_HANDSHAKE_TOKEN=%s", c.handshakeToken))
defer func() {
c.handshakeToken = ""
close(c.handshakeDone)
c.handshakeDoneLock.Lock()
defer c.handshakeDoneLock.Unlock()
c.handshakeDone = nil
}()
c.logger.Info(strings.Join(cmd.Args, " "))
// Agent take over the stdout and stderr of commander process, so if agent
// process stops commander process will receive SIGPIPE in linux. So
// commander process in linux need to handle SIGPIPE to prevent unexpected exit.
cmdStdout, err := cmd.StdoutPipe()
if err != nil {
return taskerrors.NewStartProcessFailedError(fmt.Sprintf("create stdout pipe failed, %v", err))
}
cmdStderr, err := cmd.StderrPipe()
if err != nil {
return taskerrors.NewStartProcessFailedError(fmt.Sprintf("create stderr pipe failed, %v", err))
}
if err = cmd.Start(); err != nil {
return taskerrors.NewStartProcessFailedError(fmt.Sprintf("start process failed, %v", err))
}
c.logger = c.logger.WithField("commander-pid", cmd.Process.Pid)
// Make sure the command is properly cleaned up if there is an error
defer func() {
if r := recover(); r != nil {
processErr = taskerrors.NewStartProcessFailedError(fmt.Sprintf("panic occurred, %v", err))
cmdStdout.Close()
cmdStderr.Close()
}
}()
var stdouterrWaitGroup sync.WaitGroup
// wait stdout/stderr read goroutine
stdouterrWaitGroup.Add(1)
stdouterrWaitGroup.Add(1)
exitCtx, ctxCancel := context.WithCancel(context.Background())
go func() {
// wait to finish reading from stdout/stderr since the stdout/stderr
// pipe reader will be closed by the subsequent call to cmd.Wait().
stdouterrWaitGroup.Wait()
// Wait for the command to end.
err := cmd.Wait()
if err != nil {
c.logger.WithError(err).Error("commander process exited")
} else {
c.logger.Info("commander process exited")
}
ctxCancel()
c.onExit()
}()
timeout := time.After(c.config.StartTimeout)
stdoutScanner := bufio.NewScanner(cmdStdout)
stderrScanner := bufio.NewScanner(cmdStderr)
c.logger.Info("Waiting for connection")
go func() {
defer func() {
stdouterrWaitGroup.Done()
cmdStdout.Close()
}()
logger := c.logger.WithField("from", "stdout")
for stdoutScanner.Scan() {
logger.Info(stdoutScanner.Text())
}
}()
go func() {
defer func() {
stdouterrWaitGroup.Done()
cmdStderr.Close()
}()
logger := c.logger.WithField("from", "stderr")
for stderrScanner.Scan() {
logger.Info(stderrScanner.Text())
}
}()
select {
case err = <-c.handshakeDone:
if err != nil {
processErr = taskerrors.NewStartProcessFailedError(fmt.Sprintf("handshake failed, %v", err))
}
case <-exitCtx.Done():
processErr = taskerrors.NewStartProcessFailedError("process exited before handshake finishe")
case <-timeout:
processErr = taskerrors.NewStartProcessFailedError("timeout while commander start")
cmd.Process.Kill()
}
return processErr
}