plugin/commander/container/taskmanager/task.go (172 lines of code) (raw):

package taskmanager import ( "errors" "io" "os" "sync/atomic" "time" "github.com/aliyun/aliyun_assist_client/agent/util/process" "github.com/aliyun/aliyun_assist_client/commander/ipc/client" "github.com/aliyun/aliyun_assist_client/commander/taskerrors" "github.com/aliyun/aliyun_assist_client/plugin/commander/container/model" container_errors "github.com/aliyun/aliyun_assist_client/plugin/commander/container/taskerrors" "github.com/aliyun/aliyun_assist_client/thirdparty/sirupsen/logrus" ) type Task struct { SubmissionId string taskInfo TaskInfo processer model.TaskProcessor canceled atomic.Bool sendIndex atomic.Int32 exit_code int logger logrus.FieldLogger extraLubanParams string // Record task phase and when current phase began. // phase and phaseModifyTime are used to clear expired tasks. phase int phaseStartTime time.Time } type TaskInfo struct { ContainerId string ContainerName string CommandType string Timeout int WorkingDir string Username string } const ( // prechecked PHASE_PRECHECKED = iota // running PHASE_RUNNING // process exited or task is canceled, but Dispose not called PHASE_EXITED ) // PreCheck call processer.PreCheck check if task is valid. func (task *Task) PreCheck() *taskerrors.TaskError { taskLogger := task.logger.WithField("Phase", "Pre-checking") if _, taskerror := task.processer.PreCheck(); taskerror != nil { taskLogger.WithError(taskerror).Errorf("Task precheck failed.") return taskerrors.NewTaskError(taskerror.ErrorCode, taskerror.ErrorSubCode, taskerror.ErrorMessage) } // if err := storeTask(task.SubmissionId, task); err != nil { // return err // } // Task phase is precheked task.phase = PHASE_PRECHECKED task.phaseStartTime = time.Now() taskLogger.Infof("Store task: %+v", task.taskInfo) return nil } func (task *Task) Prepare(content string) *taskerrors.TaskError { taskLogger := task.logger.WithField("Phase", "Prepare") if err := task.processer.Prepare(content); err != nil { taskLogger.Error("Processer prepare failed: ", err) return taskerrors.NewTaskError(err.ErrorCode, err.ErrorSubCode, err.ErrorMessage) } return nil } func (task *Task) Run() { taskLogger := task.logger.WithField("Phase", "Running") stdouterrR, stdouterrW, err := os.Pipe() if err != nil { taskLogger.Error("Create pipe failed: ", err) taskError := taskerrors.NewCreatePipeError(err) task.sendError(taskError) return } // Task phase is running task.phase = PHASE_RUNNING task.phaseStartTime = time.Now() defer func() { // Task phase is exited task.phase = PHASE_EXITED task.phaseStartTime = time.Now() }() task.sendRunningOutput("") stoppedSendRunning := make(chan struct{}, 1) buf := make([]byte, 10240) go func() { defer close(stoppedSendRunning) for { stdouterrR.SetReadDeadline(time.Now().Add(time.Minute)) n, err := stdouterrR.Read(buf) if n > 0 { task.sendRunningOutput(string(buf[:n])) } else { task.sendRunningOutput("") } if err != nil { if errors.Is(err, io.EOF) || errors.Is(err, io.ErrClosedPipe) { taskLogger.Info("read stdouterr finished") } else { taskLogger.Error("read stdouterr failed: ", err) } return } } }() taskLogger.Info("Start command process") var status int var taskerror *taskerrors.TaskError var ok bool task.exit_code, status, err = task.processer.SyncRun(stdouterrW, stdouterrW, nil) if err != nil { if taskerror, ok = err.(*taskerrors.TaskError); !ok { executeError := container_errors.NewGeneralExecutionError(err) taskerror = taskerrors.NewTaskError(executeError.ErrorCode, executeError.ErrorSubCode, executeError.ErrorMessage) } } taskLogger.Infof("Tasks syncrun done, exitCode[%d] status[%d] taskerror: %v", task.exit_code, status, taskerror) if status == process.Success { taskLogger.WithFields(logrus.Fields{ "exitcode": task.exit_code, "extraError": taskerror, }).Info("Finished command process") } else if status == process.Timeout { taskLogger.WithFields(logrus.Fields{ "attchedError": taskerror, }).Info("Terminated command process due to timeout") } else if status == process.Fail { taskLogger.WithError(taskerror).Info("Failed command process") } else { taskLogger.WithFields(logrus.Fields{ "exitcode": task.exit_code, "status": status, "attchedError": taskerror, }).Warn("Ended command process with unexpected status") } // close stdouterrW make stdouterrR read goroutine return stdouterrW.Close() <-stoppedSendRunning stdouterrR.Close() task.sendOutput("", status, taskerror) } func (task *Task) Cancel() *taskerrors.TaskError { if task.canceled.CompareAndSwap(false, true) { task.processer.Cancel() } // Task phase is exited task.phase = PHASE_EXITED task.phaseStartTime = time.Now() return nil } func (task *Task) Cleanup() *taskerrors.TaskError { // actualy processer.Cleanup does nothing return taskerrors.NewGeneralError(task.processer.Cleanup()) } // Dispose is the last step, delete task from taskMap_ in this step func (task *Task) Dispose() *taskerrors.TaskError { task.logger.Infof("Delete task for dispose: %+v", task.taskInfo) return taskerrors.NewGeneralError(task.processer.SideEffect()) } func (task *Task) ExtraLubanParams() string { return task.extraLubanParams } func (task *Task) sendError(taskError *taskerrors.TaskError) { idx := task.sendIndex.Add(1) - 1 // FinalizeSubmissionStatus: exitCode is valid // only when the taskStatus is process.Success. task.logger.Infof("send error: index[%d] err: %s", idx, taskError) if err := client.FinalizeSubmissionStatus(task.logger, int(idx), task.SubmissionId, "", 0, process.Fail, taskError); err != nil { task.logger.Error("sendError failed: ", err) } } func (task *Task) sendOutput(output string, taskStatus int, taskError *taskerrors.TaskError) { idx := task.sendIndex.Add(1) - 1 task.logger.Infof("send output: index[%d] len[%d]", idx, len(output)) if err := client.FinalizeSubmissionStatus(task.logger, int(idx), task.SubmissionId, output, task.exit_code, taskStatus, taskError); err != nil { task.logger.Error("send output failed: ", err) } } func (task *Task) sendRunningOutput(output string) { idx := task.sendIndex.Add(1) - 1 task.logger.Infof("send running output: index[%d] len[%d]", idx, len(output)) if err := client.WriteSubmissionOutput(task.logger, int(idx), task.SubmissionId, output); err != nil { task.logger.Error("send running output failed: ", err) } }