func()

in agent/taskengine/basetask.go [234:462]


func (task *Task) Run() (taskerrors.ErrorCode, error) {
	if err := task.PreCheck(false); err != nil {
		if taskError, ok := err.(taskerrors.ExecutionError); ok {
			return taskError.ErrCode(), taskError
		}
		return 0, err
	}

	// Reuse specified logger across whole task running phase
	taskLogger := log.GetLogger().WithFields(logrus.Fields{
		"TaskId":            task.taskInfo.TaskId,
		"InvokeVersion":     task.taskInfo.InvokeVersion,
		"Phase":             "Running",
		"disableRingbuffer": task.disableOutputRingbuffer,
	})
	taskLogger.Info("Run task")

	taskLogger.Info("Prepare script file of task")
	decodeBytes, err := base64.StdEncoding.DecodeString(task.taskInfo.Content)
	if err != nil {
		task.SendError("", taskerrors.WrapErrBase64DecodeFailed, fmt.Sprintf("Base64DecodeFailed: %s", err.Error()))
		return taskerrors.WrapErrBase64DecodeFailed, errors.New("decode error")
	}
	doNotLogScript := false
	content := string(decodeBytes)
	if task.taskInfo.EnableParameter {
		content, err = parameters.ResolveBuiltinParameters(content, task.taskInfo.BuiltinParameters)
		if err != nil {
			if invalidErr, ok := err.(taskerrors.InvalidSettingError); ok {
				task.SendInvalidTask("InvalidEnvironmentParameter", invalidErr.ShortMessage())
			} else if taskErr, ok := err.(taskerrors.ExecutionError); ok {
				task.SendError("", taskErr.ErrCode(), taskErr.Error())
				return taskErr.ErrCode(), err
			} else {
				task.SendError("", taskerrors.WrapErrResolveEnvironmentParameterFailed, err.Error())
			}

			return taskerrors.WrapErrResolveEnvironmentParameterFailed, err
		}

		if strings.Contains(content, "oos-secret") {
			// Do not log script which contains secret params
			doNotLogScript = true
		}
		content, err = paramstore.ReplaceAllParameterStore(content)
		if err != nil {
			task.SendInvalidTask(err.Error(), content)
			return 0, errors.New("ReplaceAllParameterStore error")
		}
	}
	if !doNotLogScript {
		taskLogger.Info("script content:", content)
	}

	switch task.taskInfo.CommandType {
	case "RunBatScript":
		content = "@echo off\r\n" + content

		fallthrough
	case "RunPowerShellScript":
		if !flagging.IsNormalizingCRLFDisabled() {
			content = scriptmanager.NormalizeCRLF(content)
		}
	}

	if err := task.processer.Prepare(content); err != nil {
		taskLogger.WithError(err).Errorln("Failed to prepare command process")
		if commErr, ok := err.(*taskerrors.CommanderError); ok {
			task.SendError("", taskerrors.Stringer(commErr.SubCode), commErr.Error())
			return taskerrors.WrapGeneralError, err
		} else if validationErr, ok := err.(taskerrors.NormalizedValidationError); ok {
			task.SendInvalidTask(validationErr.Param(), validationErr.Value())
			return taskerrors.WrapGeneralError, err
		} else if taskErr, ok := err.(taskerrors.ExecutionError); ok {
			task.SendError("", taskErr.ErrCode(), taskErr.Error())
			return taskErr.ErrCode(), err
		} else if invalidErr, ok := err.(taskerrors.InvalidSettingError); ok {
			task.SendInvalidTask("InvalidEnvironmentParameter", invalidErr.ShortMessage())
			return taskerrors.WrapErrResolveEnvironmentParameterFailed, err
		} else {
			return taskerrors.WrapGeneralError, err
		}

	}

	taskLogger.Info("Prepare command process")
	var stdouterrWriter io.Writer
	totalQuoto := task.taskInfo.Output.LogQuota
	if totalQuoto < defaultQuoto {
		totalQuoto = defaultQuoto
	}
	stdouterrWriter, err = task.outputBuf.Init(defaultQuotoPre, totalQuoto-defaultQuotoPre)
	if err != nil {
		taskLogger.Error("init output buf failed: ", err)
		taskError := taskerrors.NewInitOutputBufError(err)
		task.SendError("", taskError.ErrCode(), taskError.Error())
		return taskError.ErrCode(), err
	}

	task.startTime = time.Now()
	task.monotonicStartTimestamp = timetool.ToAccurateTime(task.startTime.Local())
	if taskError := task.sendTaskStart(); taskError != nil {
		taskLogger.WithError(taskError).Error("Send starting event failed")
		return taskError.ErrCode(), taskError
	}
	taskLogger.Infof("Sent starting event")

	// Replace variable representing states with context and channel operation,
	// to replace dangerous state tranfering operation with straightforward
	// message passing action.
	ctx, stopSendRunning := context.WithCancel(context.Background())
	stoppedSendRunning := make(chan struct{}, 1)
	go func(ctx context.Context, stoppedSendRunning chan<- struct{}) {
		defer close(stoppedSendRunning)
		task.data_sended = 0
		// Running output is not needed to be reported during invocation of
		// periodic tasks. But stoppedSendRunning channel is still needed to be
		// closed correctly.
		if task.taskInfo.Cronat != "" {
			return
		}

		intervalMs := task.taskInfo.Output.Interval
		if intervalMs < 1000 {
			intervalMs = 1000
		}
		ticker := time.NewTicker(time.Duration(intervalMs) * time.Millisecond)
		lastReportOutputTime := time.Now()
		defer ticker.Stop()
		for {
			// serve the stop signal from context channel with higher priority
			select {
			case <-ctx.Done():
				return
			default:
				// fallthrough to the next select
			}

			select {
			case <-ticker.C:
				outputPre := task.outputBuf.ReadPre()
				if reported := task.sendRunningOutput(string(outputPre), lastReportOutputTime); reported {
					lastReportOutputTime = time.Now()
				}
				if len(outputPre) > 0 {
					atomic.AddUint32(&task.data_sended, uint32(len(outputPre)))
					taskLogger.Infof("Running output sent: %d bytes", atomic.LoadUint32(&task.data_sended))
				} else {
					taskLogger.Infof("Running output sent: %d bytes, just report running no output sent", atomic.LoadUint32(&task.data_sended))
				}
			case <-ctx.Done():
				return
			}
		}
	}(ctx, stoppedSendRunning)

	taskLogger.Info("Start command process")
	var status int
	task.exit_code, status, err = task.processer.SyncRun(stdouterrWriter, stdouterrWriter, nil)
	if status == process.Success {
		taskLogger.WithFields(logrus.Fields{
			"exitcode":   task.exit_code,
			"extraError": err,
		}).Info("Finished command process")
	} else if status == process.Timeout {
		taskLogger.WithFields(logrus.Fields{
			"attchedError": err,
		}).Info("Terminated command process due to timeout")
	} else if status == process.Fail {
		taskLogger.WithError(err).Info("Failed command process")
	} else {
		taskLogger.WithFields(logrus.Fields{
			"exitcode":     task.exit_code,
			"status":       status,
			"attchedError": err,
		}).Warn("Ended command process with unexpected status")
	}

	// That is, send stopping message to the goroutine sending running output
	stopSendRunning()
	// Wait for the goroutine sending running output to exit
	<-stoppedSendRunning
	task.endTime = time.Now()
	task.monotonicEndTimestamp = timetool.ToAccurateTime(timetool.ToStableElapsedTime(task.endTime, task.startTime).Local())

	postOutput := string(task.outputBuf.ReadAll())
	task.droped = task.outputBuf.Dropped()

	if status == process.Fail {
		if err == nil {
			task.sendOutput("failed", postOutput)
		} else if executionErr, ok := err.(taskerrors.NormalizedExecutionError); ok {
			task.SendError(postOutput, taskerrors.Stringer(executionErr.Code()), executionErr.Description())
		} else if taskErr, ok := err.(taskerrors.ExecutionError); ok {
			task.SendError(postOutput, taskErr.ErrCode(), taskErr.Error())
		} else {
			task.SendError(postOutput, taskerrors.WrapErrExecuteScriptFailed, fmt.Sprintf("ExecuteScriptFailed: %s", err.Error()))
		}
	} else if status == process.Timeout {
		task.sendOutput("timeout", postOutput)
	} else {
		if task.IsCancled() == false {
			task.sendOutput("finished", postOutput)
		}
	}
	endTaskLogger := log.GetLogger().WithFields(logrus.Fields{
		"TaskId":        task.taskInfo.TaskId,
		"InvokeVersion": task.taskInfo.InvokeVersion,
		"Phase":         "Ending",
	})
	endTaskLogger.Info("Sent final output and state")

	if err := task.outputBuf.Uninit(); err != nil {
		endTaskLogger.Error("Task outputbuffer err: ", err)
	} else {
		endTaskLogger.Info("Clean task output")
	}

	if err := task.processer.Cleanup(!flagging.GetTaskKeepScriptFile()); err != nil {
		endTaskLogger.WithError(err).Errorln("Failed to cleanup after command finished")
	}

	// Perform instructed poweroff/reboot action after task finished
	if err := task.processer.SideEffect(); err != nil {
		endTaskLogger.WithError(err).Errorln("Failed to apply side-effect of command after finished")
	}

	return 0, nil
}