in agent/taskengine/scheduletask.go [439:568]
func schedulePeriodicTask(taskInfo models.RunTaskInfo) error {
timerManager := timermanager.GetTimerManager()
if timerManager == nil {
return errors.New("Global TimerManager instance is not initialized")
}
_periodicTaskSchedulesLock.Lock()
defer _periodicTaskSchedulesLock.Unlock()
// Reuse specified logger across task scheduling phase
scheduleLogger := log.GetLogger().WithFields(logrus.Fields{
"TaskId": taskInfo.TaskId,
"InvokeVersion": taskInfo.InvokeVersion,
"Phase": "Scheduling",
})
// 1. Check whether periodic task has been registered in local task storage,
// and had corresponding timer in timer manager
_, ok := _periodicTaskSchedules[taskInfo.TaskId]
if ok {
scheduleLogger.Warn("Ignore periodic task registered in local")
return nil
}
// 2. Create PeriodicTaskSchedule object
scheduleLogger.Info("Create timer of periodic task")
periodicTaskSchedule := &PeriodicTaskSchedule{
timer: nil,
// Invocations of periodic task is not allowed to overlap, so Task struct
// for invocation data can be reused.
reusableInvocation: nil,
}
// 3. Create timer based on expression and register into TimerManager
// NOTE: reusableInvocation is binded to callback via closure feature of golang,
// maybe explicit passing into callback like "data" for traditional thread
// would be better
var timer *timermanager.Timer
var err error
var scheduleLocation *time.Location = nil
var onFinish FinishCallback = nil
if taskInfo.Repeat == models.RunTaskRate {
creationTimeSeconds := taskInfo.CreationTime / 1000
creationTimeMs := taskInfo.CreationTime % 1000
creationTime := time.Unix(creationTimeSeconds, creationTimeMs*int64(time.Millisecond))
timer, err = timerManager.CreateRateTimer(func() {
periodicTaskSchedule.startExclusiveInvocation()
}, taskInfo.Cronat, creationTime)
} else if taskInfo.Repeat == models.RunTaskAt {
timer, err = timerManager.CreateAtTimer(func() {
periodicTaskSchedule.startExclusiveInvocation()
}, taskInfo.Cronat)
} else {
timer, err = timerManager.CreateCronTimer(func() {
periodicTaskSchedule.startExclusiveInvocation()
}, taskInfo.Cronat)
}
if err != nil {
// Report errors for invalid cron/rate/at expression
var response string
var reportErr error
if cronParameterErr, ok := err.(timermanager.CronParameterError); ok {
// Only report string constant code to luban
response, reportErr = reportInvalidTask(taskInfo.TaskId, taskInfo.InvokeVersion, invalidParamCron, cronParameterErr.Code(), "")
} else {
response, reportErr = reportInvalidTask(taskInfo.TaskId, taskInfo.InvokeVersion, invalidParamCron, err.Error(), "")
}
scheduleLogger.WithFields(logrus.Fields{
"expression": taskInfo.Cronat,
"reportErr": reportErr,
"response": response,
}).WithError(err).Info("Report errors for invalid cron/rate/at expression")
return err
}
// Special attributes for additional reporting of cron tasks
if taskInfo.Repeat == models.RunTaskCron {
cronScheduled, ok := timer.Schedule.(*timermanager.CronScheduled)
if !ok {
// Should never run into logic here
errorMessage := "Unexpected schedule object when invoking onFinish callback for cron schedule!"
scheduleLogger.Errorln(errorMessage)
return errors.New(errorMessage)
}
scheduleLocation = cronScheduled.Location()
onFinish = func() {
onFinishLogger := log.GetLogger().WithFields(logrus.Fields{
"TaskId": taskInfo.TaskId,
"InvokeVersion": taskInfo.InvokeVersion,
"Phase": "onFinishCallback",
})
cronScheduled, ok := timer.Schedule.(*timermanager.CronScheduled)
if !ok {
// Should never run into logic here
onFinishLogger.Errorln("Unexpected schedule object when invoking onFinish callback for cron schedule!")
return
}
if cronScheduled.NoNextRun() {
response, err := sendStoppedOutput(taskInfo.TaskId, taskInfo.InvokeVersion, 0, 0, 0, 0, "", stopReasonCompleted, nil)
onFinishLogger.WithFields(logrus.Fields{
"response": response,
}).WithError(err).Infoln("Sent completion event for cron task on last invocation finished")
}
}
}
// then bind them to periodicTaskSchedule object
periodicTaskSchedule.timer = timer
periodicTaskSchedule.reusableInvocation, err = NewTask(taskInfo, scheduleLocation, onFinish, onTaskReportError)
if err != nil {
scheduleLogger.Info("Invalid task: ", err)
return err
}
scheduleLogger.Info("Created timer and schedule object of periodic task")
// 4. Register schedule object into _periodicTaskSchedules
_periodicTaskSchedules[taskInfo.TaskId] = periodicTaskSchedule
scheduleLogger.Info("Registered periodic task")
// 5. Current API of TimerManager requires manual startup of timer
scheduleLogger.Info("Run timer of periodic task")
_, err = timer.Run()
if err != nil {
timerManager.DeleteTimer(periodicTaskSchedule.timer)
delete(_periodicTaskSchedules, taskInfo.TaskId)
return err
}
scheduleLogger.Info("Running timer of periodic task")
return nil
}