commander/ipc/server/server.go (195 lines of code) (raw):

package server import ( "context" "encoding/json" "fmt" "github.com/aliyun/aliyun_assist_client/agent/log" "github.com/aliyun/aliyun_assist_client/commander/ipc/client" "github.com/aliyun/aliyun_assist_client/commander/model" "github.com/aliyun/aliyun_assist_client/commander/taskinterface" pb "github.com/aliyun/aliyun_assist_client/interprocess/commander/agrpc" "github.com/aliyun/aliyun_assist_client/interprocess/messagebus/buses" "github.com/aliyun/aliyun_assist_client/thirdparty/sirupsen/logrus" "google.golang.org/grpc" ) type CommanderServer struct { pb.UnimplementedCommanderServer logger logrus.FieldLogger taskManager taskinterface.TaskManager } var commanderServer *CommanderServer func InitCommanderServer(taskManagerImpl taskinterface.TaskManager) *CommanderServer { if taskManagerImpl == nil { return nil } if commanderServer != nil { return commanderServer } commanderServer = &CommanderServer{ logger: log.GetLogger().WithField("role", "server"), taskManager: taskManagerImpl, } return commanderServer } func newRespStatus() *pb.RespStatus { return &pb.RespStatus{ StatusCode: 0, ErrMessage: "OK", } } func RegisterCommanderServer(sr grpc.ServiceRegistrar) { pb.RegisterCommanderServer(sr, commanderServer) } func (c *CommanderServer) Handshake(ctx context.Context, req *pb.HandshakeReq) (*pb.HandshakeResp, error) { c.logger.Info("Recv Handshake") defer c.logger.Info("Resp Handshake") resp := &pb.HandshakeResp{ Status: newRespStatus(), } endpoint := buses.Endpoint{} if err := endpoint.Parse(req.Endpoint); err != nil { resp.Status.StatusCode = 1 resp.Status.ErrMessage = fmt.Sprint("Invalid endpoint: ", err) return resp, nil } client.UpdateEndpoint(endpoint) c.logger.Info("endpoint: ", endpoint.String()) if err := client.RegisterCommander(c.logger, req.HandshakeToken); err != nil { resp.Status.StatusCode = 1 resp.Status.ErrMessage = fmt.Sprint("Register commander failed: ", err) return resp, nil } return resp, nil } func (c *CommanderServer) Ping(ctx context.Context, req *pb.PingReq) (*pb.PingResp, error) { c.logger.Info("Recv Ping") defer c.logger.Info("Resp Ping") return &pb.PingResp{ Status: newRespStatus(), }, nil } func (c *CommanderServer) ConnectFromAgent(ctx context.Context, req *pb.ConnectFromAgentReq) (*pb.ConnectFromAgentResp, error) { c.logger.Info("Recv ConnectFromAgent") defer c.logger.Info("Resp ConnectFromAgent") _, commanderSupportedApiVersion := model.GetCommanderBaseInfo() return &pb.ConnectFromAgentResp{ Status: newRespStatus(), ApiVersion: commanderSupportedApiVersion, }, nil } func (c *CommanderServer) PreCheckScript(ctx context.Context, req *pb.PreCheckScriptReq) (*pb.PreCheckScriptResp, error) { c.logger.WithField("submissionId", req.SubmissionId).Infof("Recv PreCheckScript: %+v", req) resp := &pb.PreCheckScriptResp{ Status: newRespStatus(), } defer c.logger.WithField("submissionId", req.SubmissionId).Infof("Resp PreCheckScript: %+v", resp) annotation := make(map[string]string) if err := json.Unmarshal([]byte(req.Annotation), &annotation); err != nil { return nil, err } task, err := c.taskManager.NewTask(req.SubmissionId, req.CommandType, int(req.Timeout), req.WorkingDir, req.UserName, annotation) if err != nil { resp.Status.StatusCode = 1 resp.Status.ErrMessage = fmt.Sprint("Create task failed: ", err) resp.TaskError = &pb.TaskError{ ErrorCode: err.ErrorCode, ErrorSubCode: err.ErrorSubCode, ErrorMessage: err.ErrorMessage, } return resp, nil } taskerror := task.PreCheck() if taskerror != nil { resp.TaskError = &pb.TaskError{ ErrorCode: taskerror.ErrorCode, ErrorSubCode: taskerror.ErrorSubCode, ErrorMessage: taskerror.ErrorMessage, } } resp.ExtraLubanParams = task.ExtraLubanParams() return resp, nil } func (c *CommanderServer) PrepareScript(ctx context.Context, req *pb.PrepareScriptReq) (*pb.PrepareScriptResp, error) { c.logger.WithField("submissionId", req.SubmissionId).Infof("Recv PrepareScript: %+v", req) resp := &pb.PrepareScriptResp{ Status: newRespStatus(), } defer c.logger.WithField("submissionId", req.SubmissionId).Infof("Resp PrepareScript: %+v", resp) submissionId := req.SubmissionId err := c.taskManager.PrepareTask(submissionId, req.Content) if err != nil { resp.Status.StatusCode = 1 resp.Status.ErrMessage = fmt.Sprint("Prepare task failed: ", err) resp.TaskError = &pb.TaskError{ ErrorCode: err.ErrorCode, ErrorSubCode: err.ErrorSubCode, ErrorMessage: err.ErrorMessage, } } // task, err := c.taskManager.LoadTask(submissionId) // if err == nil { // err = task.Prepare(req.Content) // } return resp, nil } func (c *CommanderServer) SubmitScript(ctx context.Context, req *pb.SubmitScriptReq) (*pb.SubmitScriptResp, error) { c.logger.WithField("submissionId", req.SubmissionId).Info("Recv SubmitScript") resp := &pb.SubmitScriptResp{ Status: newRespStatus(), } defer c.logger.WithField("submissionId", req.SubmissionId).Infof("Resp SubmitScript: %+v", resp) submissionId := req.SubmissionId err := c.taskManager.RunTask(submissionId) if err != nil { resp.Status.StatusCode = 1 resp.Status.ErrMessage = fmt.Sprint("Submit task failed: ", err) resp.TaskError = &pb.TaskError{ ErrorCode: err.ErrorCode, ErrorSubCode: err.ErrorSubCode, ErrorMessage: err.ErrorMessage, } return resp, nil } return resp, nil } func (c *CommanderServer) CancelSubmitted(ctx context.Context, req *pb.CancelSubmittedReq) (*pb.CancelSubmittedResp, error) { c.logger.WithField("submissionId", req.SubmissionId).Info("Recv CancelSubmitted") resp := &pb.CancelSubmittedResp{ Status: newRespStatus(), } defer c.logger.WithField("submissionId", req.SubmissionId).Infof("Resp CancelSubmitted: %+v", resp) submissionId := req.SubmissionId err := c.taskManager.CancelTask(submissionId) if err != nil { resp.Status.StatusCode = 1 resp.Status.ErrMessage = fmt.Sprint("Cancel task failed: ", err) return resp, nil } // task.Cancel() return resp, nil } func (c *CommanderServer) CleanUpSubmitted(ctx context.Context, req *pb.CleanUpSubmittedReq) (*pb.CleanUpSubmittedResp, error) { c.logger.WithField("submissionId", req.SubmissionId).Info("Recv CleanUpSubmitted") resp := &pb.CleanUpSubmittedResp{ Status: newRespStatus(), } defer c.logger.WithField("submissionId", req.SubmissionId).Infof("Resp CleanUpSubmitted: %+v", resp) submissionId := req.SubmissionId err := c.taskManager.CleanupTask(submissionId) if err != nil { resp.Status.StatusCode = 1 resp.Status.ErrMessage = fmt.Sprint("Clean up task failed: ", err) return resp, nil } return resp, nil } func (c *CommanderServer) DisposeSubmission(ctx context.Context, req *pb.DisposeSubmissionReq) (*pb.DisposeSubmissionResp, error) { c.logger.WithField("submissionId", req.SubmissionId).Info("Recv DisposeSubmission") resp := &pb.DisposeSubmissionResp{ Status: newRespStatus(), } defer c.logger.WithField("submissionId", req.SubmissionId).Infof("Resp DisposeSubmission: %+v", resp) submissionId := req.SubmissionId err := c.taskManager.DisposeTask(submissionId) if err != nil { resp.Status.StatusCode = 1 resp.Status.ErrMessage = fmt.Sprint("Dispose task failed: ", err) return resp, nil } return resp, nil }