agent/taskengine/commander/processor.go (222 lines of code) (raw):

package commander import ( "context" "fmt" "io" "sync" "sync/atomic" "time" "github.com/google/uuid" "github.com/aliyun/aliyun_assist_client/agent/commandermanager" "github.com/aliyun/aliyun_assist_client/agent/log" "github.com/aliyun/aliyun_assist_client/agent/taskengine/models" "github.com/aliyun/aliyun_assist_client/agent/util/process" "github.com/aliyun/aliyun_assist_client/thirdparty/sirupsen/logrus" "github.com/aliyun/aliyun_assist_client/agent/taskengine/taskerrors" ) var ( // submissionId => processor processorMg_ sync.Map ) type CommanderProcessor struct { TaskId string // Fundamental properties of command process CommandType string CommandContent string InvokeVersion int Repeat models.RunTaskRepeatType Timeout int WorkingDirectory string Username string Password string // commander-specific parameters Annotation map[string]string commanderName string commander *commandermanager.Commander submissionId string extraLubanParams string logger logrus.FieldLogger done context.Context doneFunc context.CancelFunc isResSetted atomic.Bool stdoutWriter io.Writer stderrWriter io.Writer exitCode int resStatus int resError *taskerrors.CommanderError } func NewCommanderProcessor(taskInfo models.RunTaskInfo, timeout int, annotation map[string]string, commanderName string) (*CommanderProcessor, error) { p := &CommanderProcessor{ TaskId: taskInfo.TaskId, InvokeVersion: taskInfo.InvokeVersion, CommandType: taskInfo.CommandType, Repeat: taskInfo.Repeat, Timeout: timeout, WorkingDirectory: taskInfo.WorkingDir, Username: taskInfo.Username, Password: taskInfo.Password, Annotation: annotation, commanderName: commanderName, extraLubanParams: fmt.Sprintf("&commanderName=%s", commanderName), } id := uuid.NewString()[:8] p.submissionId = p.TaskId + "_" + id p.logger = log.GetLogger().WithFields(logrus.Fields{ "submissionId": p.submissionId, }) p.logger.Info(taskInfo.TaskId) return p, nil } func (p *CommanderProcessor) PreCheck() (string, error) { if commander, err := commandermanager.GetCommanderRunning(p.commanderName); err != nil { return "", err } else { p.commander = commander } client, err := p.commander.Client() if err != nil { return err.Code, err } extraLubanParams, err := client.PreCheckScript(p.logger, context.Background(), p.submissionId, p.CommandType, p.Timeout, p.WorkingDirectory, p.Username, p.Password, p.Annotation) if err != nil { p.logger.WithError(err).Error("precheck script failed") return err.Code, err } p.extraLubanParams += fmt.Sprintf("&commanderVersion=%s", p.commander.Version()) p.extraLubanParams += extraLubanParams return "", nil } func (p *CommanderProcessor) Prepare(content string) error { client, err := p.commander.Client() if err != nil { return err } err = client.PrepareScript(p.logger, context.Background(), p.submissionId, content) if err != nil { p.logger.WithError(err).Error("prepare script failed") return err } p.CommandContent = content return nil } func (p *CommanderProcessor) SyncRun( stdoutWriter io.Writer, stderrWriter io.Writer, stdinReader io.Reader) (exitCode int, resStatus int, resError error) { p.stdoutWriter = stdoutWriter p.stderrWriter = stderrWriter p.done, p.doneFunc = context.WithCancel(context.Background()) defer p.doneFunc() storeTask(p.submissionId, p) defer deleteTask(p.submissionId) if err := p.submitScript(); err != nil { p.logger.WithError(err).Error("submit script failed") // the exitCode is only valid when process.Success return -1, process.Fail, err } defer func() { exitCode = p.exitCode resStatus = p.resStatus resError = p.resError }() p.commander.RegisterExitHandler(p.submissionId, func() { p.SetFinalStatus(-1, process.Fail, taskerrors.NewCommanderExitError("commander exit before task finished")) }) defer p.commander.DeRegisterExitHandler(p.submissionId) timeout := time.NewTimer(time.Duration(p.Timeout+3) * time.Second) for { select { case <-timeout.C: if p.isResSetted.CompareAndSwap(false, true) { p.resStatus = process.Timeout p.resError = taskerrors.NewStatusUpdateTimeoutError("commander not report task status before timeout") } return case <-p.done.Done(): return } } } func (p *CommanderProcessor) Cancel() error { client, err := p.commander.Client() if err != nil { return err } err = client.CancelSubmitted(p.logger, context.Background(), p.submissionId) if err != nil { p.logger.WithError(err).Error("cancel task failed") return err } return nil } func (p *CommanderProcessor) Cleanup(bool) error { client, err := p.commander.Client() if err != nil { return err } err = client.CleanUpSubmitted(p.logger, context.Background(), p.submissionId) if err != nil { p.logger.WithError(err).Error("clean up failed") return err } return nil } func (p *CommanderProcessor) ExtraLubanParams() string { return p.extraLubanParams } func (p *CommanderProcessor) SideEffect() error { client, err := p.commander.Client() if err != nil { return err } err = client.DisposeSubmission(p.logger, context.Background(), p.submissionId) if err != nil { p.logger.WithError(err).Error("dispose submission failed") return err } return nil } func (p *CommanderProcessor) SetFinalStatus(exitCode, taskStatus int, err *taskerrors.CommanderError) { if p.isResSetted.CompareAndSwap(false, true) { p.exitCode = exitCode p.resStatus = taskStatus p.resError = err if p.doneFunc != nil { p.doneFunc() } } } func (p *CommanderProcessor) WriteOutput(index int, output string) { if p.stdoutWriter != nil { p.stdoutWriter.Write([]byte(output)) } else if p.stderrWriter != nil { p.stderrWriter.Write([]byte(output)) } } func (p *CommanderProcessor) submitScript() error { client, err := p.commander.Client() if err != nil { return err } err = client.SubmitScript(p.logger, context.Background(), p.submissionId) if err != nil { p.logger.WithError(err).Error("submit script failed") return err } return nil } func FindCommanderTask(submissionId string) (*CommanderProcessor, error) { if value, ok := processorMg_.Load(submissionId); !ok { return nil, fmt.Errorf("submission not exist") } else if task, ok := value.(*CommanderProcessor); !ok { return nil, fmt.Errorf("type convert failed") } else { return task, nil } } func storeTask(submissionId string, p *CommanderProcessor) error { if _, ok := processorMg_.LoadOrStore(submissionId, p); ok { return fmt.Errorf("submission duplicated") } return nil } func deleteTask(submission string) { processorMg_.Delete(submission) }