agent/taskengine/scheduletask.go (511 lines of code) (raw):
package taskengine
import (
"errors"
"strconv"
"sync"
"sync/atomic"
"time"
"github.com/aliyun/aliyun_assist_client/thirdparty/sirupsen/logrus"
heavylock "github.com/viney-shih/go-lock"
"github.com/aliyun/aliyun_assist_client/agent/flagging"
"github.com/aliyun/aliyun_assist_client/agent/log"
"github.com/aliyun/aliyun_assist_client/agent/metrics"
"github.com/aliyun/aliyun_assist_client/agent/taskengine/models"
"github.com/aliyun/aliyun_assist_client/agent/taskengine/signature"
"github.com/aliyun/aliyun_assist_client/agent/taskengine/timermanager"
"github.com/aliyun/aliyun_assist_client/agent/util/atomicutil"
)
const (
ErrUpdatingProcedureRunning = -7
)
const (
NormalTaskType = 0
SessionTaskType = 1
)
// PeriodicTaskSchedule consists of timer and reusable invocation data structure
// for periodic task
type PeriodicTaskSchedule struct {
timer *timermanager.Timer
reusableInvocation *Task
}
var (
// FetchingTaskLock indicates whether one goroutine is fetching tasks
FetchingTaskLock heavylock.CASMutex
// FetchingTaskCounter indicates how many goroutines are fetching tasks
FetchingTaskCounter atomicutil.AtomicInt32
// Indicating whether is enabled to fetch tasks, ONLY operated by atomic operation
_neverDirectWrite_Atomic_FetchingTaskEnabled int32 = 0
_periodicTaskSchedules map[string]*PeriodicTaskSchedule
_periodicTaskSchedulesLock sync.Mutex
// Indicating whether startup fetch(reason=startup) has been done
_startupFetchedFinished bool
_startupFetchLock sync.Mutex
_startupFetchedDone chan struct{}
// Retry interval
fetchRetryInterval = time.Duration(3) * time.Second
)
func init() {
FetchingTaskLock = heavylock.NewCASMutex()
_periodicTaskSchedules = make(map[string]*PeriodicTaskSchedule)
_startupFetchedDone = make(chan struct{})
}
// EnableFetchingTask sets prviate indicator to allow fetching tasks
func EnableFetchingTask() {
atomic.StoreInt32(&_neverDirectWrite_Atomic_FetchingTaskEnabled, 1)
}
func isEnabledFetchingTask() bool {
state := atomic.LoadInt32(&_neverDirectWrite_Atomic_FetchingTaskEnabled)
return state != 0
}
func IsStartupFetched() bool {
return _startupFetchedFinished
}
func Fetch(from_kick bool, taskId string, taskType int) int {
logger := log.GetLogger().WithField("from_kick", from_kick)
// Fetching task should be allowed before all core components of agent have
// been correctly initialized. This critical indicator would be set at the
// end of program.run method
if !isEnabledFetchingTask() {
logger.Infoln("Fetching tasks is disabled due to network is not ready")
return 0
}
// NOTE: sync.Mutex from Go standard library does not support try-lock
// operation like std::mutex in C++ STL, which makes it slightly hard for
// goroutines of fetching tasks and checking updates to coopearate gracefully.
// Futhermore, it does not support try-lock operation with specified timeout,
// which makes it hard for goroutines of fetching tasks to wait in queue but
// just throw many message about lock accquisition failure confusing others.
// THUS heavy weight lock from github.com/viney-shih/go-lock library is used
// to provide graceful locking mechanism for goroutine coopeartion. The cost
// would be, some performance lost.
if !FetchingTaskLock.TryLockWithTimeout(time.Duration(2) * time.Second) {
logger.Infoln("Fetching tasks is canceled due to another running startupFetchRetrying or updating process.")
return ErrUpdatingProcedureRunning
}
// Immediately release fetchingTaskLock to let other goroutine fetching
// tasks go, but keep updating safe
FetchingTaskLock.Unlock()
// Increase fetchingTaskCounter to indicate there is a goroutine fetching
// tasks, which the updating goroutine MUST notice and decrease it to let
// updating goroutine go.
FetchingTaskCounter.Add(1)
defer FetchingTaskCounter.Add(-1)
var task_size int
var fetchErr error
// `isColdstart` only make sense for FetchOnStartup
isColdstart, _ := flagging.IsColdstart()
// Fetch tasks for kickoff if startup tasks has been fetched.
if _startupFetchedFinished {
task_size = fetchForKickOff(taskId, taskType, isColdstart)
return task_size
}
_startupFetchLock.Lock()
// Check again whether the startup tasks has been fetched.
if _startupFetchedFinished {
_startupFetchLock.Unlock()
task_size = fetchForKickOff(taskId, taskType, isColdstart)
return task_size
}
// Fetch tasks for startup.
task_size, fetchErr = fetchTasks(FetchOnStartup, taskId, taskType, isColdstart)
if fetchErr == nil {
_startupFetchedFinished = true
close(_startupFetchedDone)
_startupFetchLock.Unlock()
return task_size
}
_startupFetchLock.Unlock()
// The from_kick=false Fetch() is responsible for retrying.
// There should be only one Fetch() with from_kick=false.
if !from_kick {
// In order not to block Fetch(false, ...) a new goroutine to be created
// for continuous retrying.
go func() {
aontherFetchErr := fetchErr
ticker := time.NewTicker(fetchRetryInterval)
defer ticker.Stop()
for aontherFetchErr != nil {
select {
case <-ticker.C:
_startupFetchLock.Lock()
if _startupFetchedFinished {
_startupFetchLock.Unlock()
return
}
_, aontherFetchErr = fetchTasks(FetchOnStartup, taskId, taskType, isColdstart)
if aontherFetchErr == nil {
_startupFetchedFinished = true
close(_startupFetchedDone)
_startupFetchLock.Unlock()
return
}
_startupFetchLock.Unlock()
ticker.Reset(fetchRetryInterval)
case <-_startupFetchedDone:
// Another goroutine has finished fetching tasks for startup.
return
}
}
}()
}
return 0
}
func fetchForKickOff(taskId string, taskType int, isColdstart bool) (task_size int) {
task_size, _ = fetchTasks(FetchOnKickoff, taskId, taskType, isColdstart)
for i := 0; i < 1 && task_size == 0; i++ {
time.Sleep(fetchRetryInterval)
task_size, _ = fetchTasks(FetchOnKickoff, taskId, taskType, false)
}
return
}
func fetchTasks(reason FetchReason, taskId string, taskType int, isColdstart bool) (int, error) {
taskInfos, err := FetchTaskList(reason, taskId, taskType, isColdstart)
if err != nil {
return 0, err
}
SendFiles(taskInfos.sendFiles)
DoSessionTask(taskInfos.sessionInfos)
for _, v := range taskInfos.runInfos {
dispatchRunTask(v)
}
for _, v := range taskInfos.stopInfos {
dispatchStopTask(v)
}
for _, v := range taskInfos.testInfos {
dispatchTestTask(v)
}
return len(taskInfos.runInfos) + len(taskInfos.stopInfos) + len(taskInfos.sessionInfos) + len(taskInfos.sendFiles), nil
}
func dispatchRunTask(taskInfo models.RunTaskInfo) {
fetchLogger := log.GetLogger().WithFields(logrus.Fields{
"TaskId": taskInfo.TaskId,
"InvokeVersion": taskInfo.InvokeVersion,
"Phase": "Fetched",
})
fetchLogger.Info("Fetched to be run")
taskFactory := GetTaskFactory()
var existedTask *Task
if existedTask, _ = taskFactory.GetTask(taskInfo.TaskId); existedTask == nil {
existedTask = getPeriodicTask(taskInfo.TaskId)
}
if existedTask != nil {
if existedTask.taskInfo.InvokeVersion != taskInfo.InvokeVersion {
// Task existed but with different InvokeVersion needs rehandle
fetchLogger.Infof("Existed task with InvokeVersion[%d] needs rehandle",
existedTask.taskInfo.InvokeVersion)
switch taskInfo.Repeat {
case models.RunTaskCron, models.RunTaskRate, models.RunTaskAt:
fetchLogger.Infof("Cancel periodic task with InvokeVersion[%d] quietly", existedTask.taskInfo.InvokeVersion)
cancelPeriodicTask(existedTask.taskInfo, true)
default:
fetchLogger.Warning("Existed task is not Period. New task is duplicately fetched, ignore it")
return
}
} else {
// Tasks should not be duplicately handled
fetchLogger.Warning("Ignored duplicately fetched task")
return
}
}
// Reuse specified logger across task scheduling phase
scheduleLogger := log.GetLogger().WithFields(logrus.Fields{
"TaskId": taskInfo.TaskId,
"InvokeVersion": taskInfo.InvokeVersion,
"Phase": "Scheduling",
})
// Verify task signature
if ok, err := signature.VerifyTaskSign(scheduleLogger, taskInfo); err != nil {
scheduleLogger.WithError(err).Error("Verify task signature error")
} else if !ok {
scheduleLogger.Error("Task signature is invalid.")
reportInvalidTask(taskInfo.TaskId, taskInfo.InvokeVersion, invalidSignature, "Signature verification failed", "")
return
} else {
scheduleLogger.Info("Task signature is OK.")
}
switch taskInfo.Repeat {
case models.RunTaskOnce, models.RunTaskNextRebootOnly, models.RunTaskEveryReboot:
t, err := NewTask(taskInfo, nil, nil, onTaskReportError)
if err != nil {
scheduleLogger.Error("Invalid task: ", err)
return
}
scheduleLogger.Info("Schedule non-periodic task")
// Non-periodic tasks are managed by TaskFactory
if err := taskFactory.AddTask(t); err != nil {
scheduleLogger.Error("Add task failed: ", err.Error())
return
}
pool := GetDispatcher()
pool.PutTask(func() {
code, err := t.Run()
if code != 0 || err != nil {
metrics.GetTaskFailedEvent(
"taskid", t.taskInfo.TaskId,
"InvokeVersion", strconv.Itoa(t.taskInfo.InvokeVersion),
"errormsg", err.Error(),
"reason", strconv.Itoa(int(code)),
).ReportEvent()
}
taskFactory := GetTaskFactory()
taskFactory.RemoveTaskByName(t.taskInfo.TaskId)
})
scheduleLogger.Info("Scheduled for pending or running")
case models.RunTaskCron, models.RunTaskRate, models.RunTaskAt:
// Periodic tasks are managed by _periodicTaskSchedules
err := schedulePeriodicTask(taskInfo)
if err != nil {
scheduleLogger.WithFields(logrus.Fields{
"taskInfo": taskInfo,
}).WithError(err).Errorln("Failed to schedule periodic task")
} else {
scheduleLogger.Infoln("Succeed to schedule periodic task")
}
default:
scheduleLogger.WithFields(logrus.Fields{
"taskInfo": taskInfo,
}).Errorln("Unknown repeat type")
}
}
func dispatchStopTask(taskInfo models.RunTaskInfo) {
log.GetLogger().WithFields(logrus.Fields{
"TaskId": taskInfo.TaskId,
"InvokeVersion": taskInfo.InvokeVersion,
"Phase": "Fetched",
}).Info("Fetched to be canceled")
cancelLogger := log.GetLogger().WithFields(logrus.Fields{
"TaskId": taskInfo.TaskId,
"InvokeVersion": taskInfo.InvokeVersion,
"Phase": "Cancelling",
})
taskFactory := GetTaskFactory()
switch taskInfo.Repeat {
case models.RunTaskOnce, models.RunTaskNextRebootOnly, models.RunTaskEveryReboot:
scheduledTask, ok := taskFactory.GetTask(taskInfo.TaskId)
if ok {
cancelLogger.Info("Cancel task and invocation")
scheduledTask.Cancel(false, ok)
cancelLogger.Info("Canceled task and invocation")
} else {
response, err := sendStoppedOutput(taskInfo.TaskId, taskInfo.InvokeVersion, 0, 0, 0, 0, "", stopReasonKilled, nil)
cancelLogger.WithFields(logrus.Fields{
"response": response,
}).WithError(err).Warning("Force cancelling task not found due to finished or error")
}
case models.RunTaskCron, models.RunTaskRate, models.RunTaskAt:
// Periodic tasks are managed by _periodicTaskSchedules
err := cancelPeriodicTask(taskInfo, false)
if err != nil {
cancelLogger.WithFields(logrus.Fields{
"taskInfo": taskInfo,
}).WithError(err).Errorln("Failed to cancel periodic task")
} else {
cancelLogger.Infoln("Succeed to cancel periodic task")
}
default:
cancelLogger.WithFields(logrus.Fields{
"taskInfo": taskInfo,
}).Errorln("Unknown repeat type")
}
}
func dispatchTestTask(taskInfo models.RunTaskInfo) {
fetchLogger := log.GetLogger().WithFields(logrus.Fields{
"TaskId": taskInfo.TaskId,
"InvokeVersion": taskInfo.InvokeVersion,
"Phase": "Fetched",
})
fetchLogger.Info("Fetched to be run")
taskFactory := GetTaskFactory()
// Tasks should not be duplicately handled
if taskFactory.ContainsTaskByName(taskInfo.TaskId) {
fetchLogger.Warning("Ignored duplicately fetched task")
return
}
// Reuse specified logger across task scheduling phase
scheduleLogger := log.GetLogger().WithFields(logrus.Fields{
"TaskId": taskInfo.TaskId,
"InvokeVersion": taskInfo.InvokeVersion,
"Phase": "Scheduling",
})
switch taskInfo.Repeat {
case models.RunTaskOnce, models.RunTaskCron, models.RunTaskNextRebootOnly, models.RunTaskEveryReboot, models.RunTaskRate, models.RunTaskAt:
t, err := NewTask(taskInfo, nil, nil, onTaskReportError)
if err != nil {
scheduleLogger.Info("Invalid task: ", err)
return
}
scheduleLogger.Info("Schedule testing task to be pre-checked")
pool := GetDispatcher()
pool.PutTask(func() {
t.PreCheck(true)
})
scheduleLogger.Info("Scheduled testing task to be pre-checked")
default:
scheduleLogger.WithFields(logrus.Fields{
"taskInfo": taskInfo,
}).Errorln("Unknown repeat type")
}
}
func (s *PeriodicTaskSchedule) startExclusiveInvocation() {
// Reuse specified logger across task scheduling phase
invocateLogger := log.GetLogger().WithFields(logrus.Fields{
"TaskId": s.reusableInvocation.taskInfo.TaskId,
"InvokeVersion": s.reusableInvocation.taskInfo.InvokeVersion,
"Phase": "PeriodicInvocating",
})
// NOTE: TaskPool has been closely wired with TaskFactory, thus:
taskFactory := GetTaskFactory()
// (3) Existed invocation in TaskFactory means task is running.
if taskFactory.ContainsTaskByName(s.reusableInvocation.taskInfo.TaskId) {
invocateLogger.Warn("Skip invocation since overlapped with existing invocation")
return
}
invocateLogger.Info("Schedule new invocation of periodic task")
// (2) Every time of invocation need to add itself into TaskFactory at first.
taskFactory.AddTask(s.reusableInvocation)
pool := GetDispatcher()
pool.PutTask(func() {
// reusableInvocation may be canceled in previous Run,
// reusableInvocation.ResetCancel() is called to reset the canceled flag.
s.reusableInvocation.ResetCancel()
code, err := s.reusableInvocation.Run()
if code != 0 || err != nil {
metrics.GetTaskFailedEvent(
"taskid", s.reusableInvocation.taskInfo.TaskId,
"InvokeVersion", strconv.Itoa(s.reusableInvocation.taskInfo.InvokeVersion),
"errormsg", err.Error(),
"reason", strconv.Itoa(int(code)),
).ReportEvent()
}
taskFactory := GetTaskFactory()
taskFactory.RemoveTaskByName(s.reusableInvocation.taskInfo.TaskId)
})
invocateLogger.Info("Scheduled new pending or running invocation")
}
func schedulePeriodicTask(taskInfo models.RunTaskInfo) error {
timerManager := timermanager.GetTimerManager()
if timerManager == nil {
return errors.New("Global TimerManager instance is not initialized")
}
_periodicTaskSchedulesLock.Lock()
defer _periodicTaskSchedulesLock.Unlock()
// Reuse specified logger across task scheduling phase
scheduleLogger := log.GetLogger().WithFields(logrus.Fields{
"TaskId": taskInfo.TaskId,
"InvokeVersion": taskInfo.InvokeVersion,
"Phase": "Scheduling",
})
// 1. Check whether periodic task has been registered in local task storage,
// and had corresponding timer in timer manager
_, ok := _periodicTaskSchedules[taskInfo.TaskId]
if ok {
scheduleLogger.Warn("Ignore periodic task registered in local")
return nil
}
// 2. Create PeriodicTaskSchedule object
scheduleLogger.Info("Create timer of periodic task")
periodicTaskSchedule := &PeriodicTaskSchedule{
timer: nil,
// Invocations of periodic task is not allowed to overlap, so Task struct
// for invocation data can be reused.
reusableInvocation: nil,
}
// 3. Create timer based on expression and register into TimerManager
// NOTE: reusableInvocation is binded to callback via closure feature of golang,
// maybe explicit passing into callback like "data" for traditional thread
// would be better
var timer *timermanager.Timer
var err error
var scheduleLocation *time.Location = nil
var onFinish FinishCallback = nil
if taskInfo.Repeat == models.RunTaskRate {
creationTimeSeconds := taskInfo.CreationTime / 1000
creationTimeMs := taskInfo.CreationTime % 1000
creationTime := time.Unix(creationTimeSeconds, creationTimeMs*int64(time.Millisecond))
timer, err = timerManager.CreateRateTimer(func() {
periodicTaskSchedule.startExclusiveInvocation()
}, taskInfo.Cronat, creationTime)
} else if taskInfo.Repeat == models.RunTaskAt {
timer, err = timerManager.CreateAtTimer(func() {
periodicTaskSchedule.startExclusiveInvocation()
}, taskInfo.Cronat)
} else {
timer, err = timerManager.CreateCronTimer(func() {
periodicTaskSchedule.startExclusiveInvocation()
}, taskInfo.Cronat)
}
if err != nil {
// Report errors for invalid cron/rate/at expression
var response string
var reportErr error
if cronParameterErr, ok := err.(timermanager.CronParameterError); ok {
// Only report string constant code to luban
response, reportErr = reportInvalidTask(taskInfo.TaskId, taskInfo.InvokeVersion, invalidParamCron, cronParameterErr.Code(), "")
} else {
response, reportErr = reportInvalidTask(taskInfo.TaskId, taskInfo.InvokeVersion, invalidParamCron, err.Error(), "")
}
scheduleLogger.WithFields(logrus.Fields{
"expression": taskInfo.Cronat,
"reportErr": reportErr,
"response": response,
}).WithError(err).Info("Report errors for invalid cron/rate/at expression")
return err
}
// Special attributes for additional reporting of cron tasks
if taskInfo.Repeat == models.RunTaskCron {
cronScheduled, ok := timer.Schedule.(*timermanager.CronScheduled)
if !ok {
// Should never run into logic here
errorMessage := "Unexpected schedule object when invoking onFinish callback for cron schedule!"
scheduleLogger.Errorln(errorMessage)
return errors.New(errorMessage)
}
scheduleLocation = cronScheduled.Location()
onFinish = func() {
onFinishLogger := log.GetLogger().WithFields(logrus.Fields{
"TaskId": taskInfo.TaskId,
"InvokeVersion": taskInfo.InvokeVersion,
"Phase": "onFinishCallback",
})
cronScheduled, ok := timer.Schedule.(*timermanager.CronScheduled)
if !ok {
// Should never run into logic here
onFinishLogger.Errorln("Unexpected schedule object when invoking onFinish callback for cron schedule!")
return
}
if cronScheduled.NoNextRun() {
response, err := sendStoppedOutput(taskInfo.TaskId, taskInfo.InvokeVersion, 0, 0, 0, 0, "", stopReasonCompleted, nil)
onFinishLogger.WithFields(logrus.Fields{
"response": response,
}).WithError(err).Infoln("Sent completion event for cron task on last invocation finished")
}
}
}
// then bind them to periodicTaskSchedule object
periodicTaskSchedule.timer = timer
periodicTaskSchedule.reusableInvocation, err = NewTask(taskInfo, scheduleLocation, onFinish, onTaskReportError)
if err != nil {
scheduleLogger.Info("Invalid task: ", err)
return err
}
scheduleLogger.Info("Created timer and schedule object of periodic task")
// 4. Register schedule object into _periodicTaskSchedules
_periodicTaskSchedules[taskInfo.TaskId] = periodicTaskSchedule
scheduleLogger.Info("Registered periodic task")
// 5. Current API of TimerManager requires manual startup of timer
scheduleLogger.Info("Run timer of periodic task")
_, err = timer.Run()
if err != nil {
timerManager.DeleteTimer(periodicTaskSchedule.timer)
delete(_periodicTaskSchedules, taskInfo.TaskId)
return err
}
scheduleLogger.Info("Running timer of periodic task")
return nil
}
// Cancel periodic task. If quietly is false, notify server the task is canceled.
func cancelPeriodicTask(taskInfo models.RunTaskInfo, quietly bool) error {
timerManager := timermanager.GetTimerManager()
if timerManager == nil {
return errors.New("Global TimerManager instance is not initialized")
}
_periodicTaskSchedulesLock.Lock()
defer _periodicTaskSchedulesLock.Unlock()
cancelLogger := log.GetLogger().WithFields(logrus.Fields{
"TaskId": taskInfo.TaskId,
"InvokeVersion": taskInfo.InvokeVersion,
"Quietly": quietly,
"Phase": "Cancelling",
})
// 1. Check whether task is registered in local storage
periodicTaskSchedule, ok := _periodicTaskSchedules[taskInfo.TaskId]
if !ok {
response, err := sendStoppedOutput(taskInfo.TaskId, taskInfo.InvokeVersion, 0, 0, 0, 0, "", stopReasonKilled, nil)
cancelLogger.WithFields(logrus.Fields{
"response": response,
}).WithError(err).Warning("Force cancelling periodic task unregistered due to finished or previous errors")
return nil
}
// 2. Delete timer of periodic task from TimerManager, which contains stopping
// timer operation
timerManager.DeleteTimer(periodicTaskSchedule.timer)
cancelLogger.Infof("Stop and remove timer of periodic task")
// 3. Delete registered task record from local storage
delete(_periodicTaskSchedules, taskInfo.TaskId)
cancelLogger.Infof("Deregistered periodic task")
// 4. Cancel periodic task, send ACK if invocation is existing
_, ok = GetTaskFactory().GetTask(taskInfo.TaskId)
cancelLogger.WithField("stillRunning", ok).Info("Cancel periodic task")
err := periodicTaskSchedule.reusableInvocation.Cancel(quietly, ok)
if err != nil {
cancelLogger.WithError(err).Error("Canceled periodic task failed")
} else {
cancelLogger.Infof("Canceled periodic task")
}
return nil
}
func getPeriodicTask(taskName string) *Task {
_periodicTaskSchedulesLock.Lock()
defer _periodicTaskSchedulesLock.Unlock()
periodicTaskSchedule, ok := _periodicTaskSchedules[taskName]
if ok {
return periodicTaskSchedule.reusableInvocation
}
return nil
}
func deletePeriodicTask(taskId string) {
timerManager := timermanager.GetTimerManager()
if timerManager == nil {
return
}
_periodicTaskSchedulesLock.Lock()
defer _periodicTaskSchedulesLock.Unlock()
log.GetLogger().WithField("taskId", taskId).Info("Delete periodic task")
if schedul, ok := _periodicTaskSchedules[taskId]; ok {
timerManager.DeleteTimer(schedul.timer)
}
delete(_periodicTaskSchedules, taskId)
}
// If server returns an errorCode in the response of task/xxx api, we need to do
// some actions like deleting periodic task.
func onTaskReportError(taskId string, repeat models.RunTaskRepeatType, errorCode, status string) (isTaskErr bool) {
isTaskErr = true
switch errorCode {
case TaskReportErrTaskNotFound:
// Delete periodic task
if repeat == models.RunTaskCron || repeat == models.RunTaskRate || repeat == models.RunTaskAt {
deletePeriodicTask(taskId)
}
case TaskReportErrTaskStatusInvalid:
// Delete periodic task if backend server tells us task is terminated
if status == TaskInvalidStatusTerminated && (repeat == models.RunTaskCron || repeat == models.RunTaskRate || repeat == models.RunTaskAt) {
deletePeriodicTask(taskId)
}
default:
isTaskErr = false
}
return
}