in plugin/commander/container/servercmd.go [102:201]
func runServerCmd(ctx *cli.Context, args []string) error {
var err error
var waitTime int64
waitTime = defaultWaitTime
waitTimeStr, _ := ctx.Flags().Get(waitTimeFlagName).GetValue()
endpointStr, _ := ctx.Flags().Get(endpointFlagName).GetValue()
pidFile, _ := ctx.Flags().Get(pidFileFlagName).GetValue()
logPath, _ := ctx.Flags().Get(LogPathFlagName).GetValue()
if waitTimeStr != "" {
waitTime, err = strconv.ParseInt(waitTimeStr, 10, 0)
if err != nil {
fmt.Fprintf(os.Stderr, "Invalid param `%s`: %s\n", waitTimeFlagName, waitTimeStr)
return err
}
idlecheck.SetExtendSecond(int(waitTime))
}
log.InitLog("container_commander.log", logPath, true)
model.SetCommanderBaseInfo(commanderName, commanderSupportedApiVersion)
server.InitCommanderServer(taskManager)
go checkIdle(log.GetLogger())
// write pid file
if pidFile == "" {
pidFile = filepath.Join(os.TempDir(), defaultPidFile)
}
if err := os.WriteFile(pidFile, []byte(fmt.Sprint(os.Getpid())), 0600); err != nil {
fmt.Fprintln(os.Stderr, "Write pid file failed: ", err)
cli.Exit(1)
}
var ep *buses.Endpoint
if endpointStr != "" {
ep = &buses.Endpoint{}
if err := ep.Parse(endpointStr); err != nil {
fmt.Fprintf(os.Stderr, "Invalid endpoint `%s`: %v\n", endpointStr, err)
cli.Exit(1)
}
endpoint.SetEndpoint(ep)
} else {
endpoint.SetSocketAndProtocol(commanderSocketName, commanderProtocol)
}
// register commander to agent
go func() {
// wait for server listening
time.Sleep(time.Second * time.Duration(1))
endpointStr := os.Getenv("AXT_AGENT_MESSAGEBUS")
handshakeToken := os.Getenv("AXT_HANDSHAKE_TOKEN")
endpoint := buses.Endpoint{}
if err := endpoint.Parse(endpointStr); err != nil {
log.GetLogger().Errorf("Invalid endpoint from env AXT_AGENT_MESSAGEBUS `%s`: %v", endpointStr, err)
fmt.Fprintln(os.Stderr, "Invalid endpoint from env AXT_AGENT_MESSAGEBUS: ", endpointStr)
return
}
client.UpdateEndpoint(endpoint)
if err := client.RegisterCommander(log.GetLogger(), handshakeToken); err != nil {
log.GetLogger().Error("Register commander failed: ", err)
fmt.Fprint(os.Stderr, "Register commander failed: ", err)
return
}
fmt.Println("Register commander done")
log.GetLogger().Info("Register commander done")
}()
log.GetLogger().Infof("Starting...... version: %s githash: %s", version.AssistVersion, version.GitCommitHash)
signalCh := listenSignal()
for {
serveErr := make(chan error)
server, err := messagebus_server.ListenAndServe(log.GetLogger(), *endpoint.GetEndpoint(true), serveErr,
[]messagebus_server.RegisterFunc{
server.RegisterCommanderServer,
},
grpc.UnaryInterceptor(
interceptor.KeepProcessAlive,
),
)
if err != nil {
log.GetLogger().Error("Listen faield: ", err)
fmt.Println("Listen failed: ", err)
cli.Exit(1)
}
log.GetLogger().Infof("Listenning on endpoint: %+v", endpoint.GetEndpoint(false).String())
select {
case err := <-serveErr:
log.GetLogger().Error("Server failed: ", err)
fmt.Println("Server failed: ", err)
cli.Exit(1)
case s := <-signalCh:
if s == syscall.SIGUSR1 {
// restart grpc server
log.GetLogger().Info("Recv signal SIGUSR1, restart server")
server.Stop()
} else if s == syscall.SIGPIPE {
log.GetLogger().Warn("Recv signal SIGPIPE, do nothing")
} else {
log.GetLogger().Errorf("Recv unhandled signal[%d]", s)
}
}
}
}