in plugin/commander/container/taskmanager/task.go [81:163]
func (task *Task) Run() {
taskLogger := task.logger.WithField("Phase", "Running")
stdouterrR, stdouterrW, err := os.Pipe()
if err != nil {
taskLogger.Error("Create pipe failed: ", err)
taskError := taskerrors.NewCreatePipeError(err)
task.sendError(taskError)
return
}
// Task phase is running
task.phase = PHASE_RUNNING
task.phaseStartTime = time.Now()
defer func() {
// Task phase is exited
task.phase = PHASE_EXITED
task.phaseStartTime = time.Now()
}()
task.sendRunningOutput("")
stoppedSendRunning := make(chan struct{}, 1)
buf := make([]byte, 10240)
go func() {
defer close(stoppedSendRunning)
for {
stdouterrR.SetReadDeadline(time.Now().Add(time.Minute))
n, err := stdouterrR.Read(buf)
if n > 0 {
task.sendRunningOutput(string(buf[:n]))
} else {
task.sendRunningOutput("")
}
if err != nil {
if errors.Is(err, io.EOF) || errors.Is(err, io.ErrClosedPipe) {
taskLogger.Info("read stdouterr finished")
} else {
taskLogger.Error("read stdouterr failed: ", err)
}
return
}
}
}()
taskLogger.Info("Start command process")
var status int
var taskerror *taskerrors.TaskError
var ok bool
task.exit_code, status, err = task.processer.SyncRun(stdouterrW, stdouterrW, nil)
if err != nil {
if taskerror, ok = err.(*taskerrors.TaskError); !ok {
executeError := container_errors.NewGeneralExecutionError(err)
taskerror = taskerrors.NewTaskError(executeError.ErrorCode, executeError.ErrorSubCode, executeError.ErrorMessage)
}
}
taskLogger.Infof("Tasks syncrun done, exitCode[%d] status[%d] taskerror: %v", task.exit_code, status, taskerror)
if status == process.Success {
taskLogger.WithFields(logrus.Fields{
"exitcode": task.exit_code,
"extraError": taskerror,
}).Info("Finished command process")
} else if status == process.Timeout {
taskLogger.WithFields(logrus.Fields{
"attchedError": taskerror,
}).Info("Terminated command process due to timeout")
} else if status == process.Fail {
taskLogger.WithError(taskerror).Info("Failed command process")
} else {
taskLogger.WithFields(logrus.Fields{
"exitcode": task.exit_code,
"status": status,
"attchedError": taskerror,
}).Warn("Ended command process with unexpected status")
}
// close stdouterrW make stdouterrR read goroutine return
stdouterrW.Close()
<-stoppedSendRunning
stdouterrR.Close()
task.sendOutput("", status, taskerror)
}