func dispatchRunTask()

in agent/taskengine/scheduletask.go [218:313]


func dispatchRunTask(taskInfo models.RunTaskInfo) {
	fetchLogger := log.GetLogger().WithFields(logrus.Fields{
		"TaskId":        taskInfo.TaskId,
		"InvokeVersion": taskInfo.InvokeVersion,
		"Phase":         "Fetched",
	})
	fetchLogger.Info("Fetched to be run")

	taskFactory := GetTaskFactory()
	var existedTask *Task
	if existedTask, _ = taskFactory.GetTask(taskInfo.TaskId); existedTask == nil {
		existedTask = getPeriodicTask(taskInfo.TaskId)
	}
	if existedTask != nil {
		if existedTask.taskInfo.InvokeVersion != taskInfo.InvokeVersion {
			// Task existed but with different InvokeVersion needs rehandle
			fetchLogger.Infof("Existed task with InvokeVersion[%d] needs rehandle",
				existedTask.taskInfo.InvokeVersion)
			switch taskInfo.Repeat {
			case models.RunTaskCron, models.RunTaskRate, models.RunTaskAt:
				fetchLogger.Infof("Cancel periodic task with InvokeVersion[%d] quietly", existedTask.taskInfo.InvokeVersion)
				cancelPeriodicTask(existedTask.taskInfo, true)
			default:
				fetchLogger.Warning("Existed task is not Period. New task is duplicately fetched, ignore it")
				return
			}
		} else {
			// Tasks should not be duplicately handled
			fetchLogger.Warning("Ignored duplicately fetched task")
			return
		}
	}

	// Reuse specified logger across task scheduling phase
	scheduleLogger := log.GetLogger().WithFields(logrus.Fields{
		"TaskId":        taskInfo.TaskId,
		"InvokeVersion": taskInfo.InvokeVersion,
		"Phase":         "Scheduling",
	})

	// Verify task signature
	if ok, err := signature.VerifyTaskSign(scheduleLogger, taskInfo); err != nil {
		scheduleLogger.WithError(err).Error("Verify task signature error")
	} else if !ok {
		scheduleLogger.Error("Task signature is invalid.")
		reportInvalidTask(taskInfo.TaskId, taskInfo.InvokeVersion, invalidSignature, "Signature verification failed", "")
		return
	} else {
		scheduleLogger.Info("Task signature is OK.")
	}

	switch taskInfo.Repeat {
	case models.RunTaskOnce, models.RunTaskNextRebootOnly, models.RunTaskEveryReboot:
		t, err := NewTask(taskInfo, nil, nil, onTaskReportError)
		if err != nil {
			scheduleLogger.Error("Invalid task: ", err)
			return
		}

		scheduleLogger.Info("Schedule non-periodic task")
		// Non-periodic tasks are managed by TaskFactory
		if err := taskFactory.AddTask(t); err != nil {
			scheduleLogger.Error("Add task failed: ", err.Error())
			return
		}
		pool := GetDispatcher()
		pool.PutTask(func() {
			code, err := t.Run()
			if code != 0 || err != nil {
				metrics.GetTaskFailedEvent(
					"taskid", t.taskInfo.TaskId,
					"InvokeVersion", strconv.Itoa(t.taskInfo.InvokeVersion),
					"errormsg", err.Error(),
					"reason", strconv.Itoa(int(code)),
				).ReportEvent()
			}
			taskFactory := GetTaskFactory()
			taskFactory.RemoveTaskByName(t.taskInfo.TaskId)
		})
		scheduleLogger.Info("Scheduled for pending or running")
	case models.RunTaskCron, models.RunTaskRate, models.RunTaskAt:
		// Periodic tasks are managed by _periodicTaskSchedules
		err := schedulePeriodicTask(taskInfo)
		if err != nil {
			scheduleLogger.WithFields(logrus.Fields{
				"taskInfo": taskInfo,
			}).WithError(err).Errorln("Failed to schedule periodic task")
		} else {
			scheduleLogger.Infoln("Succeed to schedule periodic task")
		}
	default:
		scheduleLogger.WithFields(logrus.Fields{
			"taskInfo": taskInfo,
		}).Errorln("Unknown repeat type")
	}
}