agent/statemanager/schedule.go (184 lines of code) (raw):

package statemanager import ( "fmt" "math/rand" "runtime" "runtime/debug" "strconv" "strings" "sync" "time" "github.com/aliyun/aliyun_assist_client/agent/clientreport" "github.com/aliyun/aliyun_assist_client/agent/log" "github.com/aliyun/aliyun_assist_client/agent/taskengine/timermanager" ) const ( // Cron schedule type, expression format is defined in https://help.aliyun.com/document_detail/169784.html Cron = "cron" // Rate schedule type, expression format examples: "5 minutes" "1 hour" Rate = "rate" // Cron is triggered with a drift, after the expected time // This helps to reduce concurrency MaxCronDriftSeconds = 15 * 60 ) type StateConfigTimer struct { timer *timermanager.Timer scheduleType string scheduleExpression string } var ( stateConfigTimers = map[string]*StateConfigTimer{} stateConfigTimersLock sync.Mutex stateConfigEnforceLock sync.Mutex ) func init() { rand.Seed(time.Now().UnixNano()) } func GetRateInSeconds(expr string) (seconds int, err error) { parts := strings.Split(expr, " ") if len(parts) != 2 { err = fmt.Errorf("wrong rate expression: %s", expr) return } seconds, err = strconv.Atoi(parts[0]) if err != nil { err = fmt.Errorf("wrong rate expression: %s", expr) return } unit := parts[1] switch unit { case "second", "seconds": seconds *= 1 case "minute", "minutes": seconds *= 60 case "hour", "hours": seconds *= 60 * 60 case "day", "days": seconds *= 60 * 60 * 24 default: err = fmt.Errorf("wrong unit in rate expression: %s", expr) return } return } func isScheduleChanged(config StateConfiguration) bool { stateConfigTimer, ok := stateConfigTimers[config.StateConfigurationId] if !ok { return true } if config.ScheduleType != stateConfigTimer.scheduleType || config.ScheduleExpression != stateConfigTimer.scheduleExpression { return true } return false } func createStateConfigCallBack(stateConfigId string) timermanager.TimerCallback { callback := func() { defer func() { if panicPayload := recover(); panicPayload != nil { stacktrace := debug.Stack() clientreport.ReportPanic(panicPayload, stacktrace, false) } }() config, ok := getStateConfig(stateConfigId) if !ok { log.GetLogger().Warnf("state configuration %s does not exist", stateConfigId) return } if config.ScheduleType == "cron" { driftMills := rand.Intn(MaxCronDriftSeconds * 1000) log.GetLogger().Infof("delay %d milliseconds for cron to reduce concurrency", driftMills) time.Sleep(time.Duration(driftMills) * time.Millisecond) } stateConfigEnforceLock.Lock() defer stateConfigEnforceLock.Unlock() err := enforce(config) if err != nil { log.GetLogger().WithError(err).Errorf("enforce state configuration %s fail", config.StateConfigurationId) } runtime.GC() } return callback } func setupStateConfigTimer(config StateConfiguration) (err error) { timerManager := timermanager.GetTimerManager() var timer *timermanager.Timer if config.ScheduleType == Rate { var intervalSeconds int intervalSeconds, err = GetRateInSeconds(config.ScheduleExpression) if err != nil { return } timer, err = timerManager.CreateTimerInSeconds(createStateConfigCallBack(config.StateConfigurationId), intervalSeconds) } else if config.ScheduleType == Cron { timer, err = timerManager.CreateCronTimer(createStateConfigCallBack(config.StateConfigurationId), config.ScheduleExpression) } else { err = fmt.Errorf("Invalid schedule type %s", config.ScheduleType) } if err != nil { return } _, err = timer.Run() stateConfgTimer := StateConfigTimer{timer, config.ScheduleType, config.ScheduleExpression} stateConfigTimers[config.StateConfigurationId] = &stateConfgTimer log.GetLogger().Infof("setup timer for %s", config.StateConfigurationId) return } func tearDownStateConfigTimer(stateConfigId string) (err error) { stateConfgTimer, ok := stateConfigTimers[stateConfigId] if !ok { return } timermanager.GetTimerManager().DeleteTimer(stateConfgTimer.timer) delete(stateConfigTimers, stateConfigId) log.GetLogger().Infof("tear down timer for %s", stateConfigId) return } func refreshStateConfigTimers(configs []StateConfiguration) (err error) { stateConfigTimersLock.Lock() defer stateConfigTimersLock.Unlock() var stateConfigIds = make([]string, len(configs)) for _, config := range configs { stateConfigIds = append(stateConfigIds, config.StateConfigurationId) refreshStateConfigTimer(config) } cleanupDeleted(stateConfigIds) return } func refreshStateConfigTimer(config StateConfiguration) (err error) { if isScheduleChanged(config) { log.GetLogger().Infof("%s schedule changed, refresh timer", config.StateConfigurationId) err = tearDownStateConfigTimer(config.StateConfigurationId) if err != nil { log.GetLogger().WithError(err).Error("tear down timer failed") } setupStateConfigTimer(config) if err != nil { log.GetLogger().WithError(err).Error("set up timer failed") } } else { log.GetLogger().Debugf("%s schedule not changed", config.StateConfigurationId) } return } func cleanupDeleted(existStateConfigIds []string) { for stateConfigId := range stateConfigTimers { var exist = false for _, existId := range existStateConfigIds { if existId == stateConfigId { exist = true break } } if !exist { tearDownStateConfigTimer(stateConfigId) } } } func IsStateConfigTimerRunning() bool { stateConfigTimersLock.Lock() defer stateConfigTimersLock.Unlock() if stateConfigTimers != nil { for _, t := range stateConfigTimers { if t.timer.IsRunning() { return true } } } return false }