in agent/taskengine/basetask.go [234:462]
func (task *Task) Run() (taskerrors.ErrorCode, error) {
if err := task.PreCheck(false); err != nil {
if taskError, ok := err.(taskerrors.ExecutionError); ok {
return taskError.ErrCode(), taskError
}
return 0, err
}
// Reuse specified logger across whole task running phase
taskLogger := log.GetLogger().WithFields(logrus.Fields{
"TaskId": task.taskInfo.TaskId,
"InvokeVersion": task.taskInfo.InvokeVersion,
"Phase": "Running",
"disableRingbuffer": task.disableOutputRingbuffer,
})
taskLogger.Info("Run task")
taskLogger.Info("Prepare script file of task")
decodeBytes, err := base64.StdEncoding.DecodeString(task.taskInfo.Content)
if err != nil {
task.SendError("", taskerrors.WrapErrBase64DecodeFailed, fmt.Sprintf("Base64DecodeFailed: %s", err.Error()))
return taskerrors.WrapErrBase64DecodeFailed, errors.New("decode error")
}
doNotLogScript := false
content := string(decodeBytes)
if task.taskInfo.EnableParameter {
content, err = parameters.ResolveBuiltinParameters(content, task.taskInfo.BuiltinParameters)
if err != nil {
if invalidErr, ok := err.(taskerrors.InvalidSettingError); ok {
task.SendInvalidTask("InvalidEnvironmentParameter", invalidErr.ShortMessage())
} else if taskErr, ok := err.(taskerrors.ExecutionError); ok {
task.SendError("", taskErr.ErrCode(), taskErr.Error())
return taskErr.ErrCode(), err
} else {
task.SendError("", taskerrors.WrapErrResolveEnvironmentParameterFailed, err.Error())
}
return taskerrors.WrapErrResolveEnvironmentParameterFailed, err
}
if strings.Contains(content, "oos-secret") {
// Do not log script which contains secret params
doNotLogScript = true
}
content, err = paramstore.ReplaceAllParameterStore(content)
if err != nil {
task.SendInvalidTask(err.Error(), content)
return 0, errors.New("ReplaceAllParameterStore error")
}
}
if !doNotLogScript {
taskLogger.Info("script content:", content)
}
switch task.taskInfo.CommandType {
case "RunBatScript":
content = "@echo off\r\n" + content
fallthrough
case "RunPowerShellScript":
if !flagging.IsNormalizingCRLFDisabled() {
content = scriptmanager.NormalizeCRLF(content)
}
}
if err := task.processer.Prepare(content); err != nil {
taskLogger.WithError(err).Errorln("Failed to prepare command process")
if commErr, ok := err.(*taskerrors.CommanderError); ok {
task.SendError("", taskerrors.Stringer(commErr.SubCode), commErr.Error())
return taskerrors.WrapGeneralError, err
} else if validationErr, ok := err.(taskerrors.NormalizedValidationError); ok {
task.SendInvalidTask(validationErr.Param(), validationErr.Value())
return taskerrors.WrapGeneralError, err
} else if taskErr, ok := err.(taskerrors.ExecutionError); ok {
task.SendError("", taskErr.ErrCode(), taskErr.Error())
return taskErr.ErrCode(), err
} else if invalidErr, ok := err.(taskerrors.InvalidSettingError); ok {
task.SendInvalidTask("InvalidEnvironmentParameter", invalidErr.ShortMessage())
return taskerrors.WrapErrResolveEnvironmentParameterFailed, err
} else {
return taskerrors.WrapGeneralError, err
}
}
taskLogger.Info("Prepare command process")
var stdouterrWriter io.Writer
totalQuoto := task.taskInfo.Output.LogQuota
if totalQuoto < defaultQuoto {
totalQuoto = defaultQuoto
}
stdouterrWriter, err = task.outputBuf.Init(defaultQuotoPre, totalQuoto-defaultQuotoPre)
if err != nil {
taskLogger.Error("init output buf failed: ", err)
taskError := taskerrors.NewInitOutputBufError(err)
task.SendError("", taskError.ErrCode(), taskError.Error())
return taskError.ErrCode(), err
}
task.startTime = time.Now()
task.monotonicStartTimestamp = timetool.ToAccurateTime(task.startTime.Local())
if taskError := task.sendTaskStart(); taskError != nil {
taskLogger.WithError(taskError).Error("Send starting event failed")
return taskError.ErrCode(), taskError
}
taskLogger.Infof("Sent starting event")
// Replace variable representing states with context and channel operation,
// to replace dangerous state tranfering operation with straightforward
// message passing action.
ctx, stopSendRunning := context.WithCancel(context.Background())
stoppedSendRunning := make(chan struct{}, 1)
go func(ctx context.Context, stoppedSendRunning chan<- struct{}) {
defer close(stoppedSendRunning)
task.data_sended = 0
// Running output is not needed to be reported during invocation of
// periodic tasks. But stoppedSendRunning channel is still needed to be
// closed correctly.
if task.taskInfo.Cronat != "" {
return
}
intervalMs := task.taskInfo.Output.Interval
if intervalMs < 1000 {
intervalMs = 1000
}
ticker := time.NewTicker(time.Duration(intervalMs) * time.Millisecond)
lastReportOutputTime := time.Now()
defer ticker.Stop()
for {
// serve the stop signal from context channel with higher priority
select {
case <-ctx.Done():
return
default:
// fallthrough to the next select
}
select {
case <-ticker.C:
outputPre := task.outputBuf.ReadPre()
if reported := task.sendRunningOutput(string(outputPre), lastReportOutputTime); reported {
lastReportOutputTime = time.Now()
}
if len(outputPre) > 0 {
atomic.AddUint32(&task.data_sended, uint32(len(outputPre)))
taskLogger.Infof("Running output sent: %d bytes", atomic.LoadUint32(&task.data_sended))
} else {
taskLogger.Infof("Running output sent: %d bytes, just report running no output sent", atomic.LoadUint32(&task.data_sended))
}
case <-ctx.Done():
return
}
}
}(ctx, stoppedSendRunning)
taskLogger.Info("Start command process")
var status int
task.exit_code, status, err = task.processer.SyncRun(stdouterrWriter, stdouterrWriter, nil)
if status == process.Success {
taskLogger.WithFields(logrus.Fields{
"exitcode": task.exit_code,
"extraError": err,
}).Info("Finished command process")
} else if status == process.Timeout {
taskLogger.WithFields(logrus.Fields{
"attchedError": err,
}).Info("Terminated command process due to timeout")
} else if status == process.Fail {
taskLogger.WithError(err).Info("Failed command process")
} else {
taskLogger.WithFields(logrus.Fields{
"exitcode": task.exit_code,
"status": status,
"attchedError": err,
}).Warn("Ended command process with unexpected status")
}
// That is, send stopping message to the goroutine sending running output
stopSendRunning()
// Wait for the goroutine sending running output to exit
<-stoppedSendRunning
task.endTime = time.Now()
task.monotonicEndTimestamp = timetool.ToAccurateTime(timetool.ToStableElapsedTime(task.endTime, task.startTime).Local())
postOutput := string(task.outputBuf.ReadAll())
task.droped = task.outputBuf.Dropped()
if status == process.Fail {
if err == nil {
task.sendOutput("failed", postOutput)
} else if executionErr, ok := err.(taskerrors.NormalizedExecutionError); ok {
task.SendError(postOutput, taskerrors.Stringer(executionErr.Code()), executionErr.Description())
} else if taskErr, ok := err.(taskerrors.ExecutionError); ok {
task.SendError(postOutput, taskErr.ErrCode(), taskErr.Error())
} else {
task.SendError(postOutput, taskerrors.WrapErrExecuteScriptFailed, fmt.Sprintf("ExecuteScriptFailed: %s", err.Error()))
}
} else if status == process.Timeout {
task.sendOutput("timeout", postOutput)
} else {
if task.IsCancled() == false {
task.sendOutput("finished", postOutput)
}
}
endTaskLogger := log.GetLogger().WithFields(logrus.Fields{
"TaskId": task.taskInfo.TaskId,
"InvokeVersion": task.taskInfo.InvokeVersion,
"Phase": "Ending",
})
endTaskLogger.Info("Sent final output and state")
if err := task.outputBuf.Uninit(); err != nil {
endTaskLogger.Error("Task outputbuffer err: ", err)
} else {
endTaskLogger.Info("Clean task output")
}
if err := task.processer.Cleanup(!flagging.GetTaskKeepScriptFile()); err != nil {
endTaskLogger.WithError(err).Errorln("Failed to cleanup after command finished")
}
// Perform instructed poweroff/reboot action after task finished
if err := task.processer.SideEffect(); err != nil {
endTaskLogger.WithError(err).Errorln("Failed to apply side-effect of command after finished")
}
return 0, nil
}