in agent/taskengine/scheduletask.go [218:313]
func dispatchRunTask(taskInfo models.RunTaskInfo) {
fetchLogger := log.GetLogger().WithFields(logrus.Fields{
"TaskId": taskInfo.TaskId,
"InvokeVersion": taskInfo.InvokeVersion,
"Phase": "Fetched",
})
fetchLogger.Info("Fetched to be run")
taskFactory := GetTaskFactory()
var existedTask *Task
if existedTask, _ = taskFactory.GetTask(taskInfo.TaskId); existedTask == nil {
existedTask = getPeriodicTask(taskInfo.TaskId)
}
if existedTask != nil {
if existedTask.taskInfo.InvokeVersion != taskInfo.InvokeVersion {
// Task existed but with different InvokeVersion needs rehandle
fetchLogger.Infof("Existed task with InvokeVersion[%d] needs rehandle",
existedTask.taskInfo.InvokeVersion)
switch taskInfo.Repeat {
case models.RunTaskCron, models.RunTaskRate, models.RunTaskAt:
fetchLogger.Infof("Cancel periodic task with InvokeVersion[%d] quietly", existedTask.taskInfo.InvokeVersion)
cancelPeriodicTask(existedTask.taskInfo, true)
default:
fetchLogger.Warning("Existed task is not Period. New task is duplicately fetched, ignore it")
return
}
} else {
// Tasks should not be duplicately handled
fetchLogger.Warning("Ignored duplicately fetched task")
return
}
}
// Reuse specified logger across task scheduling phase
scheduleLogger := log.GetLogger().WithFields(logrus.Fields{
"TaskId": taskInfo.TaskId,
"InvokeVersion": taskInfo.InvokeVersion,
"Phase": "Scheduling",
})
// Verify task signature
if ok, err := signature.VerifyTaskSign(scheduleLogger, taskInfo); err != nil {
scheduleLogger.WithError(err).Error("Verify task signature error")
} else if !ok {
scheduleLogger.Error("Task signature is invalid.")
reportInvalidTask(taskInfo.TaskId, taskInfo.InvokeVersion, invalidSignature, "Signature verification failed", "")
return
} else {
scheduleLogger.Info("Task signature is OK.")
}
switch taskInfo.Repeat {
case models.RunTaskOnce, models.RunTaskNextRebootOnly, models.RunTaskEveryReboot:
t, err := NewTask(taskInfo, nil, nil, onTaskReportError)
if err != nil {
scheduleLogger.Error("Invalid task: ", err)
return
}
scheduleLogger.Info("Schedule non-periodic task")
// Non-periodic tasks are managed by TaskFactory
if err := taskFactory.AddTask(t); err != nil {
scheduleLogger.Error("Add task failed: ", err.Error())
return
}
pool := GetDispatcher()
pool.PutTask(func() {
code, err := t.Run()
if code != 0 || err != nil {
metrics.GetTaskFailedEvent(
"taskid", t.taskInfo.TaskId,
"InvokeVersion", strconv.Itoa(t.taskInfo.InvokeVersion),
"errormsg", err.Error(),
"reason", strconv.Itoa(int(code)),
).ReportEvent()
}
taskFactory := GetTaskFactory()
taskFactory.RemoveTaskByName(t.taskInfo.TaskId)
})
scheduleLogger.Info("Scheduled for pending or running")
case models.RunTaskCron, models.RunTaskRate, models.RunTaskAt:
// Periodic tasks are managed by _periodicTaskSchedules
err := schedulePeriodicTask(taskInfo)
if err != nil {
scheduleLogger.WithFields(logrus.Fields{
"taskInfo": taskInfo,
}).WithError(err).Errorln("Failed to schedule periodic task")
} else {
scheduleLogger.Infoln("Succeed to schedule periodic task")
}
default:
scheduleLogger.WithFields(logrus.Fields{
"taskInfo": taskInfo,
}).Errorln("Unknown repeat type")
}
}