func schedulePeriodicTask()

in agent/taskengine/scheduletask.go [439:568]


func schedulePeriodicTask(taskInfo models.RunTaskInfo) error {
	timerManager := timermanager.GetTimerManager()
	if timerManager == nil {
		return errors.New("Global TimerManager instance is not initialized")
	}

	_periodicTaskSchedulesLock.Lock()
	defer _periodicTaskSchedulesLock.Unlock()

	// Reuse specified logger across task scheduling phase
	scheduleLogger := log.GetLogger().WithFields(logrus.Fields{
		"TaskId":        taskInfo.TaskId,
		"InvokeVersion": taskInfo.InvokeVersion,
		"Phase":         "Scheduling",
	})

	// 1. Check whether periodic task has been registered in local task storage,
	// and had corresponding timer in timer manager
	_, ok := _periodicTaskSchedules[taskInfo.TaskId]
	if ok {
		scheduleLogger.Warn("Ignore periodic task registered in local")
		return nil
	}

	// 2. Create PeriodicTaskSchedule object
	scheduleLogger.Info("Create timer of periodic task")
	periodicTaskSchedule := &PeriodicTaskSchedule{
		timer: nil,
		// Invocations of periodic task is not allowed to overlap, so Task struct
		// for invocation data can be reused.
		reusableInvocation: nil,
	}
	// 3. Create timer based on expression and register into TimerManager
	// NOTE: reusableInvocation is binded to callback via closure feature of golang,
	// maybe explicit passing into callback like "data" for traditional thread
	// would be better
	var timer *timermanager.Timer
	var err error
	var scheduleLocation *time.Location = nil
	var onFinish FinishCallback = nil
	if taskInfo.Repeat == models.RunTaskRate {
		creationTimeSeconds := taskInfo.CreationTime / 1000
		creationTimeMs := taskInfo.CreationTime % 1000
		creationTime := time.Unix(creationTimeSeconds, creationTimeMs*int64(time.Millisecond))
		timer, err = timerManager.CreateRateTimer(func() {
			periodicTaskSchedule.startExclusiveInvocation()
		}, taskInfo.Cronat, creationTime)
	} else if taskInfo.Repeat == models.RunTaskAt {
		timer, err = timerManager.CreateAtTimer(func() {
			periodicTaskSchedule.startExclusiveInvocation()
		}, taskInfo.Cronat)
	} else {
		timer, err = timerManager.CreateCronTimer(func() {
			periodicTaskSchedule.startExclusiveInvocation()
		}, taskInfo.Cronat)
	}
	if err != nil {
		// Report errors for invalid cron/rate/at expression
		var response string
		var reportErr error
		if cronParameterErr, ok := err.(timermanager.CronParameterError); ok {
			// Only report string constant code to luban
			response, reportErr = reportInvalidTask(taskInfo.TaskId, taskInfo.InvokeVersion, invalidParamCron, cronParameterErr.Code(), "")
		} else {
			response, reportErr = reportInvalidTask(taskInfo.TaskId, taskInfo.InvokeVersion, invalidParamCron, err.Error(), "")
		}
		scheduleLogger.WithFields(logrus.Fields{
			"expression": taskInfo.Cronat,
			"reportErr":  reportErr,
			"response":   response,
		}).WithError(err).Info("Report errors for invalid cron/rate/at expression")
		return err
	}
	// Special attributes for additional reporting of cron tasks
	if taskInfo.Repeat == models.RunTaskCron {
		cronScheduled, ok := timer.Schedule.(*timermanager.CronScheduled)
		if !ok {
			// Should never run into logic here
			errorMessage := "Unexpected schedule object when invoking onFinish callback for cron schedule!"
			scheduleLogger.Errorln(errorMessage)
			return errors.New(errorMessage)
		}
		scheduleLocation = cronScheduled.Location()
		onFinish = func() {
			onFinishLogger := log.GetLogger().WithFields(logrus.Fields{
				"TaskId":        taskInfo.TaskId,
				"InvokeVersion": taskInfo.InvokeVersion,
				"Phase":         "onFinishCallback",
			})

			cronScheduled, ok := timer.Schedule.(*timermanager.CronScheduled)
			if !ok {
				// Should never run into logic here
				onFinishLogger.Errorln("Unexpected schedule object when invoking onFinish callback for cron schedule!")
				return
			}

			if cronScheduled.NoNextRun() {
				response, err := sendStoppedOutput(taskInfo.TaskId, taskInfo.InvokeVersion, 0, 0, 0, 0, "", stopReasonCompleted, nil)
				onFinishLogger.WithFields(logrus.Fields{
					"response": response,
				}).WithError(err).Infoln("Sent completion event for cron task on last invocation finished")
			}
		}
	}
	// then bind them to periodicTaskSchedule object
	periodicTaskSchedule.timer = timer
	periodicTaskSchedule.reusableInvocation, err = NewTask(taskInfo, scheduleLocation, onFinish, onTaskReportError)
	if err != nil {
		scheduleLogger.Info("Invalid task: ", err)
		return err
	}
	scheduleLogger.Info("Created timer and schedule object of periodic task")

	// 4. Register schedule object into _periodicTaskSchedules
	_periodicTaskSchedules[taskInfo.TaskId] = periodicTaskSchedule
	scheduleLogger.Info("Registered periodic task")

	// 5. Current API of TimerManager requires manual startup of timer
	scheduleLogger.Info("Run timer of periodic task")
	_, err = timer.Run()
	if err != nil {
		timerManager.DeleteTimer(periodicTaskSchedule.timer)
		delete(_periodicTaskSchedules, taskInfo.TaskId)
		return err
	}
	scheduleLogger.Info("Running timer of periodic task")

	return nil
}