agent/commandermanager/commander.go (353 lines of code) (raw):
package commandermanager
import (
"bufio"
"context"
"fmt"
"os"
"os/exec"
"strconv"
"strings"
"sync"
"time"
"github.com/aliyun/aliyun_assist_client/agent/commandermanager/processutil"
"github.com/aliyun/aliyun_assist_client/agent/log"
"github.com/aliyun/aliyun_assist_client/agent/taskengine/taskerrors"
pb "github.com/aliyun/aliyun_assist_client/interprocess/commander/agrpc"
commander_client "github.com/aliyun/aliyun_assist_client/interprocess/commander/client"
"github.com/aliyun/aliyun_assist_client/interprocess/messagebus/buses"
messagebus_client "github.com/aliyun/aliyun_assist_client/interprocess/messagebus/client"
"github.com/aliyun/aliyun_assist_client/thirdparty/sirupsen/logrus"
"github.com/shirou/gopsutil/v3/process"
"google.golang.org/grpc"
)
type Commander struct {
config *CommanderConfig
l sync.Mutex
logger logrus.FieldLogger
client *commander_client.CommanderClient
clientLock sync.Mutex
exitFunc map[string]func()
onExitLock sync.Mutex
handshakeDone chan error
handshakeDoneLock sync.Mutex
handshakeToken string
}
type CommanderConfig struct {
CommanderName string
CmdPath string
PidFile string
Endpoint buses.Endpoint
Version string
ApiVersion string
// timeout to wait commander process start and finish handshake
StartTimeout time.Duration
// timeout to finish handshake with exist commander process
AttachTimeout time.Duration
// timeout to connect commander's grpc server
DialTimeout time.Duration
}
func NewCommander(config *CommanderConfig) *Commander {
c := &Commander{
config: config,
logger: log.GetLogger().WithField("commander", config.CommanderName),
}
return c
}
// RegisterExitHandler register callback functions which be called after
// commander process exit
func (c *Commander) RegisterExitHandler(name string, f func()) {
c.onExitLock.Lock()
defer c.onExitLock.Unlock()
if c.exitFunc == nil {
c.exitFunc = make(map[string]func())
}
c.exitFunc[name] = f
}
func (c *Commander) DeRegisterExitHandler(name string) {
c.onExitLock.Lock()
defer c.onExitLock.Unlock()
delete(c.exitFunc, name)
}
// CmdPath return commander's executable path
func (c *Commander) CmdPath() string {
return c.config.CmdPath
}
// // CmdPath return commander's version
func (c *Commander) Version() string {
return c.config.Version
}
// Client return commander's grpc client
func (c *Commander) Client() (*commander_client.CommanderClient, *taskerrors.CommanderError) {
c.clientLock.Lock()
defer c.clientLock.Unlock()
if c.client == nil {
return nil, taskerrors.NewClientNotCreatedError("client is nil")
}
return c.client, nil
}
// IsRunning check if commander is running
func (c *Commander) IsRunning() bool {
c.clientLock.Lock()
defer c.clientLock.Unlock()
if c.client == nil {
return false
}
return c.ping() == nil
}
func (c *Commander) ping() error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
resp, err := c.client.Ping(ctx)
if err != nil {
c.logger.Error("Ping failed: ", err)
return err
} else if resp.Status.StatusCode != 0 {
c.logger.Error("Ping failed: ", resp.Status.ErrMessage)
return fmt.Errorf(resp.Status.ErrMessage)
}
return nil
}
// CreateClient create a new grpc client
func (c *Commander) CreateClient(endpoint buses.Endpoint) (*commander_client.CommanderClient, *taskerrors.CommanderError) {
c.clientLock.Lock()
defer c.clientLock.Unlock()
conn, err := messagebus_client.ConnectWithTimeout(c.logger, endpoint, c.config.DialTimeout)
if err != nil {
return nil, taskerrors.NewCreateClientFailedError(err.Error())
}
client := commander_client.NewClient(conn)
if c.client != nil {
c.client.Close()
}
c.client = client
return c.client, nil
}
// Start start commander
func (c *Commander) Start(token string) *taskerrors.CommanderError {
c.l.Lock()
defer c.l.Unlock()
if p, alive := c.checkProcessAlive(); alive {
if err := c.attachProcess(int(p.Pid), token); err != nil {
c.logger.Error("Attach to existed process failed: ", err)
if err := p.Kill(); err != nil {
c.logger.Error("Kill existed process failed: ", err)
}
if err := c.startProcess(token); err != nil {
c.logger.Error("Start process failed: ", err)
return err
} else {
c.logger.Info("Process started!")
}
} else {
c.logger.Info("Attached to existed process!")
}
} else {
if err := c.startProcess(token); err != nil {
c.logger.Error("Start process failed: ", err)
return err
} else {
c.logger.Info("Process started!")
}
}
return nil
}
// checkProcessAlive check if commander process is running
func (c *Commander) checkProcessAlive() (*process.Process, bool) {
content, err := os.ReadFile(c.config.PidFile)
if err != nil {
return nil, false
}
pid, err := strconv.Atoi(strings.TrimSpace(string(content)))
if err != nil {
return nil, false
}
p, err := process.NewProcess(int32(pid))
if err != nil {
return nil, false
}
cmdlineSlice, err := p.CmdlineSlice()
c.logger.Info("commander cmdline: ", cmdlineSlice)
if err != nil || len(cmdlineSlice) == 0 || strings.TrimSpace(cmdlineSlice[0]) != c.config.CmdPath {
return nil, false
}
return p, true
}
// startProcess start commander process
func (c *Commander) startProcess(token string) (processErr *taskerrors.CommanderError) {
c.logger.Info("Try to start commander process")
endpoint := buses.GetCentralEndpoint(false)
args := []string{"run-server", "--pidfile", c.config.PidFile, "--endpoint", c.config.Endpoint.String()}
cmd := exec.Command(c.config.CmdPath, args...)
cmd.Env = append(cmd.Env, os.Environ()...)
cmd.Env = append(cmd.Env, fmt.Sprintf("AXT_AGENT_MESSAGEBUS=%s", endpoint.String()))
c.handshakeToken = token
c.handshakeDoneLock.Lock()
c.handshakeDone = make(chan error, 1)
c.handshakeDoneLock.Unlock()
cmd.Env = append(cmd.Env, fmt.Sprintf("AXT_HANDSHAKE_TOKEN=%s", c.handshakeToken))
defer func() {
c.handshakeToken = ""
close(c.handshakeDone)
c.handshakeDoneLock.Lock()
defer c.handshakeDoneLock.Unlock()
c.handshakeDone = nil
}()
c.logger.Info(strings.Join(cmd.Args, " "))
// Agent take over the stdout and stderr of commander process, so if agent
// process stops commander process will receive SIGPIPE in linux. So
// commander process in linux need to handle SIGPIPE to prevent unexpected exit.
cmdStdout, err := cmd.StdoutPipe()
if err != nil {
return taskerrors.NewStartProcessFailedError(fmt.Sprintf("create stdout pipe failed, %v", err))
}
cmdStderr, err := cmd.StderrPipe()
if err != nil {
return taskerrors.NewStartProcessFailedError(fmt.Sprintf("create stderr pipe failed, %v", err))
}
if err = cmd.Start(); err != nil {
return taskerrors.NewStartProcessFailedError(fmt.Sprintf("start process failed, %v", err))
}
c.logger = c.logger.WithField("commander-pid", cmd.Process.Pid)
// Make sure the command is properly cleaned up if there is an error
defer func() {
if r := recover(); r != nil {
processErr = taskerrors.NewStartProcessFailedError(fmt.Sprintf("panic occurred, %v", err))
cmdStdout.Close()
cmdStderr.Close()
}
}()
var stdouterrWaitGroup sync.WaitGroup
// wait stdout/stderr read goroutine
stdouterrWaitGroup.Add(1)
stdouterrWaitGroup.Add(1)
exitCtx, ctxCancel := context.WithCancel(context.Background())
go func() {
// wait to finish reading from stdout/stderr since the stdout/stderr
// pipe reader will be closed by the subsequent call to cmd.Wait().
stdouterrWaitGroup.Wait()
// Wait for the command to end.
err := cmd.Wait()
if err != nil {
c.logger.WithError(err).Error("commander process exited")
} else {
c.logger.Info("commander process exited")
}
ctxCancel()
c.onExit()
}()
timeout := time.After(c.config.StartTimeout)
stdoutScanner := bufio.NewScanner(cmdStdout)
stderrScanner := bufio.NewScanner(cmdStderr)
c.logger.Info("Waiting for connection")
go func() {
defer func() {
stdouterrWaitGroup.Done()
cmdStdout.Close()
}()
logger := c.logger.WithField("from", "stdout")
for stdoutScanner.Scan() {
logger.Info(stdoutScanner.Text())
}
}()
go func() {
defer func() {
stdouterrWaitGroup.Done()
cmdStderr.Close()
}()
logger := c.logger.WithField("from", "stderr")
for stderrScanner.Scan() {
logger.Info(stderrScanner.Text())
}
}()
select {
case err = <-c.handshakeDone:
if err != nil {
processErr = taskerrors.NewStartProcessFailedError(fmt.Sprintf("handshake failed, %v", err))
}
case <-exitCtx.Done():
processErr = taskerrors.NewStartProcessFailedError("process exited before handshake finishe")
case <-timeout:
processErr = taskerrors.NewStartProcessFailedError("timeout while commander start")
cmd.Process.Kill()
}
return processErr
}
// attachProcess attach running commander process
func (c *Commander) attachProcess(pid int, token string) *taskerrors.CommanderError {
c.logger.Info("Try attach commander process")
errMsg := []string{}
conn, err := messagebus_client.ConnectWithTimeout(c.logger, c.config.Endpoint, c.config.DialTimeout)
if err != nil {
c.logger.WithError(err).Error("Connect failed, try to restart commander grpc server")
errMsg = append(errMsg, fmt.Sprintf("connection failed, %v", err))
if err := processutil.RestartServer(pid); err != nil {
c.logger.WithError(err).Error("Restart commander grpc server failed")
errMsg = append(errMsg, fmt.Sprintf("restart server failed, %v", err))
}
time.Sleep(time.Second)
conn, err = messagebus_client.ConnectWithTimeout(c.logger, c.config.Endpoint, c.config.DialTimeout)
if err != nil {
c.logger.WithError(err).Error("Retry connect failed")
errMsg = append(errMsg, fmt.Sprintf("retry connection failed, %v", err))
return taskerrors.NewAttachProcessFailedError(errMsg...)
}
}
defer conn.Close()
if err := c.handshake(conn, token); err != nil {
return err
}
c.logger = c.logger.WithField("commander-pid", pid)
go func() {
processutil.PidWait(pid)
c.onExit()
}()
return nil
}
func (c *Commander) handshake(conn *grpc.ClientConn, token string) *taskerrors.CommanderError {
client := commander_client.NewClient(conn)
defer client.Close()
c.handshakeToken = token
c.handshakeDoneLock.Lock()
c.handshakeDone = make(chan error, 1)
c.handshakeDoneLock.Unlock()
defer func() {
c.handshakeToken = ""
close(c.handshakeDone)
c.handshakeDoneLock.Lock()
defer c.handshakeDoneLock.Unlock()
c.handshakeDone = nil
}()
agentEndpoint := buses.GetCentralEndpoint(false)
req := &pb.HandshakeReq{
Endpoint: agentEndpoint.String(),
HandshakeToken: c.handshakeToken,
}
ctx, cancel := context.WithTimeout(context.Background(), c.config.AttachTimeout)
defer cancel()
resp, err := client.Client.Handshake(ctx, req)
if err != nil {
return taskerrors.NewAttachProcessFailedError(fmt.Sprintf("handshake request fail, %v", err))
} else if resp.Status.StatusCode != 0 {
return taskerrors.NewAttachProcessFailedError(fmt.Sprintf("handshake request fail, %s", resp.Status.ErrMessage))
}
select {
case <-ctx.Done():
return taskerrors.NewAttachProcessFailedError("attach commander timeout")
case err := <-c.handshakeDone:
if err != nil {
return taskerrors.NewStartProcessFailedError(fmt.Sprintf("handshake failed, %v", err))
}
}
return nil
}
func (c *Commander) onExit() {
c.logger.Info("commander process exited")
c.logger = c.logger.WithField("commander-pid", "nil")
// reset c.conn
c.clientLock.Lock()
if c.client != nil {
c.client.Close()
}
c.clientLock.Unlock()
c.onExitLock.Lock()
defer c.onExitLock.Unlock()
for k, f := range c.exitFunc {
c.logger.Infof("call function %s when commander exit", k)
f()
}
}
// HandShakeDone tell startProcess/attachProcess that handshake finished
func (c *Commander) HandShakeDone(token string) {
c.handshakeDoneLock.Lock()
defer c.handshakeDoneLock.Unlock()
if c.handshakeDone != nil {
// c.handshakeDone may be closed before put err into it, this will make
// panic
defer func() {
if err := recover(); err != nil {
c.logger.WithField("func", "HandShakeDone").Error(err)
}
}()
var err error
if token == "" {
err = fmt.Errorf("invalid handshake token")
} else if token != c.handshakeToken {
err = fmt.Errorf("handshake token not match, want[%s] but [%s]", c.handshakeToken, token)
}
c.handshakeDone <- err
}
}