pkg/jobmgr/goalstate/driver.go (380 lines of code) (raw):

// Copyright (c) 2019 Uber Technologies, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package goalstate import ( "context" "sync" "sync/atomic" "time" "github.com/uber/peloton/.gen/peloton/api/v0/job" "github.com/uber/peloton/.gen/peloton/api/v0/peloton" "github.com/uber/peloton/.gen/peloton/api/v0/task" "github.com/uber/peloton/.gen/peloton/private/resmgrsvc" "github.com/uber/peloton/pkg/common" "github.com/uber/peloton/pkg/common/api" "github.com/uber/peloton/pkg/common/goalstate" "github.com/uber/peloton/pkg/common/recovery" "github.com/uber/peloton/pkg/jobmgr/cached" "github.com/uber/peloton/pkg/jobmgr/task/lifecyclemgr" "github.com/uber/peloton/pkg/storage" ormobjects "github.com/uber/peloton/pkg/storage/objects" log "github.com/sirupsen/logrus" "github.com/uber-go/tally" "github.com/uber/peloton/.gen/peloton/private/models" "go.uber.org/yarpc" "go.uber.org/yarpc/yarpcerrors" "golang.org/x/time/rate" ) // driverState indicates the running state of driver type driverState int32 const ( stopped driverState = iota + 1 stopping starting started ) // driverCacheState indicates the cache state of driver type driverCacheState int32 const ( cleaned driverCacheState = iota + 1 populated ) // Driver is the interface to enqueue jobs and tasks into the goal state engine // for evaluation and then run the corresponding actions. // The caller is also responsible for deleting from the goal state engine once // the job/task is untracked from the cache. type Driver interface { // EnqueueJob is used to enqueue a job into the goal state. It takes the job identifier // and the time at which the job should be evaluated by the goal state engine as inputs. EnqueueJob(jobID *peloton.JobID, deadline time.Time) // EnqueueTask is used to enqueue a task into the goal state. It takes the job identifier, // the instance identifier and the time at which the task should be evaluated by the // goal state engine as inputs. EnqueueTask(jobID *peloton.JobID, instanceID uint32, deadline time.Time) // EnqueueUpdate is used to enqueue a job update into the goal state. As // its input, it takes the job identifier, update identifier, and the // time at which the job update should be evaluated by the // goal state engine. EnqueueUpdate( jobID *peloton.JobID, updateID *peloton.UpdateID, deadline time.Time, ) // DeleteJob deletes the job state from the goal state engine. DeleteJob(jobID *peloton.JobID) // DeleteTask deletes the task state from the goal state engine. DeleteTask(jobID *peloton.JobID, instanceID uint32) // DeleteUpdate deletes the job update state from the goal state engine. DeleteUpdate(jobID *peloton.JobID, updateID *peloton.UpdateID) // IsScheduledTask is a helper function to check if a given task is scheduled // for evaluation in the goal state engine. IsScheduledTask(jobID *peloton.JobID, instanceID uint32) bool // JobRuntimeDuration returns the mimimum inter-run duration between job // runtime updates. This duration is different for batch and service jobs. JobRuntimeDuration(jobType job.JobType) time.Duration // Start is used to start processing items in the goal state engine. Start() // Stop is used to stop the goal state engine. // If cleanUpCache is set to true, then all of the cache would be removed, // and it would be recovered when start again. // If cleanUpCache is not set, cache would be kept in the memory, and start // would skip cache recovery. Stop(cleanUpCache bool) // Started returns true if goal state engine has finished start process Started() bool // GetLockable returns an interface which controls lock/unlock operations in goal state engine GetLockable() lifecyclemgr.Lockable } // NewDriver returns a new goal state driver object. func NewDriver( d *yarpc.Dispatcher, jobStore storage.JobStore, taskStore storage.TaskStore, volumeStore storage.PersistentVolumeStore, updateStore storage.UpdateStore, ormStore *ormobjects.Store, jobFactory cached.JobFactory, jobType job.JobType, parentScope tally.Scope, cfg Config, hmVersion api.Version, ) Driver { cfg.normalize() scope := parentScope.SubScope("goalstate") jobScope := scope.SubScope("job") taskScope := scope.SubScope("task") workflowScope := scope.SubScope("workflow") driver := &driver{ jobEngine: goalstate.NewEngine( cfg.NumWorkerJobThreads, cfg.FailureRetryDelay, cfg.MaxRetryDelay, jobScope), taskEngine: goalstate.NewEngine( cfg.NumWorkerTaskThreads, cfg.FailureRetryDelay, cfg.MaxRetryDelay, taskScope), updateEngine: goalstate.NewEngine( cfg.NumWorkerUpdateThreads, cfg.FailureRetryDelay, cfg.MaxRetryDelay, workflowScope), lm: lifecyclemgr.New(hmVersion, d, scope), resmgrClient: resmgrsvc.NewResourceManagerServiceYARPCClient( d.ClientConfig(common.PelotonResourceManager)), jobStore: jobStore, taskStore: taskStore, volumeStore: volumeStore, updateStore: updateStore, podEventsOps: ormobjects.NewPodEventsOps(ormStore), activeJobsOps: ormobjects.NewActiveJobsOps(ormStore), jobConfigOps: ormobjects.NewJobConfigOps(ormStore), jobIndexOps: ormobjects.NewJobIndexOps(ormStore), jobRuntimeOps: ormobjects.NewJobRuntimeOps(ormStore), taskConfigV2Ops: ormobjects.NewTaskConfigV2Ops(ormStore), jobFactory: jobFactory, mtx: NewMetrics(scope), cfg: &cfg, jobType: jobType, jobScope: jobScope, taskKillRateLimiter: rate.NewLimiter( cfg.RateLimiterConfig.TaskKill.Rate, cfg.RateLimiterConfig.TaskKill.Burst), executorShutShutdownRateLimiter: rate.NewLimiter( cfg.RateLimiterConfig.ExecutorShutdown.Rate, cfg.RateLimiterConfig.ExecutorShutdown.Burst), } driver.setState(stopped) driver.setCacheState(cleaned) return driver } // EnqueueJobWithDefaultDelay is a helper function to enqueue a job into the // goal state engine with the default interval at which the job runtime // updater is run. Using this function ensures that same job does not // get enqueued too many times when multiple task updates for the job // are received in a short duration of time. // TODO(zhixin): pass in job type instead of cachedJob, // remove GetJobType from the interface func EnqueueJobWithDefaultDelay( jobID *peloton.JobID, goalStateDriver Driver, cachedJob cached.Job) { goalStateDriver.EnqueueJob(jobID, time.Now().Add( goalStateDriver.JobRuntimeDuration(cachedJob.GetJobType()))) } type driver struct { // mutex to access jobEngine and taskEngine in this structure sync.RWMutex // jobEngine is the goal state engine for processing jobs and taskEngine is the goal // state engine for processing tasks. They are kept separate because there are much // fewer jobs than tasks, and job actions tend to run for more duration than // task actions and may require larger/more number of parallel Cassandra transactions. // Hence, setting one value for the engine properties like the number // of worker threads leads to problems. If the value is too small, then it results in // job starvation as well as not being able to run multiple tasks in parallel slowing // down the entire goal state engine processing. If the value is too large, then if all // the items scheduled are jobs, that can bring down Cassandra. jobEngine goalstate.Engine taskEngine goalstate.Engine updateEngine goalstate.Engine lm lifecyclemgr.Manager resmgrClient resmgrsvc.ResourceManagerServiceYARPCClient // jobStore, taskStore and volumeStore are the objects to the storage interface. jobStore storage.JobStore taskStore storage.TaskStore volumeStore storage.PersistentVolumeStore updateStore storage.UpdateStore podEventsOps ormobjects.PodEventsOps activeJobsOps ormobjects.ActiveJobsOps // DB ops for active_jobs table jobConfigOps ormobjects.JobConfigOps // DB ops for job_config table jobRuntimeOps ormobjects.JobRuntimeOps // DB ops for job_runtime table jobIndexOps ormobjects.JobIndexOps // DB ops for job_index table taskConfigV2Ops ormobjects.TaskConfigV2Ops // DB ops for task_config_v2_table // jobFactory is the in-memory cache object fpr jobs and tasks jobFactory cached.JobFactory cfg *Config // goal state engine configuration mtx *Metrics // goal state metrics running int32 // whether driver is running or not cacheState int32 // the state of driver cache jobType job.JobType // the type of the job for the driver // job scope for goalstate driver jobScope tally.Scope // rate limiter for goal state engine initiated task stop taskKillRateLimiter *rate.Limiter // rate limiter for goal state engine initiated executor shutdown executorShutShutdownRateLimiter *rate.Limiter } func (d *driver) EnqueueJob(jobID *peloton.JobID, deadline time.Time) { jobEntity := NewJobEntity(jobID, d) d.RLock() defer d.RUnlock() d.jobEngine.Enqueue(jobEntity, deadline) } func (d *driver) EnqueueTask(jobID *peloton.JobID, instanceID uint32, deadline time.Time) { taskEntity := NewTaskEntity(jobID, instanceID, d) d.RLock() defer d.RUnlock() d.taskEngine.Enqueue(taskEntity, deadline) } func (d *driver) EnqueueUpdate( jobID *peloton.JobID, updateID *peloton.UpdateID, deadline time.Time) { updateEntity := NewUpdateEntity(updateID, jobID, d) d.RLock() defer d.RUnlock() d.updateEngine.Enqueue(updateEntity, deadline) } func (d *driver) DeleteJob(jobID *peloton.JobID) { jobEntity := NewJobEntity(jobID, d) d.RLock() defer d.RUnlock() d.jobEngine.Delete(jobEntity) } func (d *driver) DeleteTask(jobID *peloton.JobID, instanceID uint32) { taskEntity := NewTaskEntity(jobID, instanceID, d) d.RLock() defer d.RUnlock() d.taskEngine.Delete(taskEntity) } func (d *driver) DeleteUpdate(jobID *peloton.JobID, updateID *peloton.UpdateID) { updateEntity := NewUpdateEntity(updateID, jobID, d) d.RLock() defer d.RUnlock() d.updateEngine.Delete(updateEntity) } func (d *driver) IsScheduledTask(jobID *peloton.JobID, instanceID uint32) bool { taskEntity := NewTaskEntity(jobID, instanceID, d) d.RLock() defer d.RUnlock() return d.taskEngine.IsScheduled(taskEntity) } func (d *driver) JobRuntimeDuration(jobType job.JobType) time.Duration { if jobType == job.JobType_BATCH { return d.cfg.JobBatchRuntimeUpdateInterval } return d.cfg.JobServiceRuntimeUpdateInterval } // recoverTasks recovers the job and tasks from DB when job manager instance // gains leadership. The jobs and tasks are loaded into the cached and enqueued // to the goal state engine for evaluation. func (d *driver) recoverTasks( ctx context.Context, id string, jobConfig *job.JobConfig, configAddOn *models.ConfigAddOn, jobRuntime *job.RuntimeInfo, batch recovery.TasksBatch, errChan chan<- error, ) { jobID := &peloton.JobID{Value: id} cachedJob := d.jobFactory.GetJob(jobID) // Do not set the job again if it already exists. if cachedJob == nil { cachedJob = d.jobFactory.AddJob(jobID) cachedJob.Update(ctx, &job.JobInfo{ Runtime: jobRuntime, Config: jobConfig, }, configAddOn, nil, cached.UpdateCacheOnly) } // Enqueue job into goal state d.EnqueueJob(jobID, time.Now().Add(d.JobRuntimeDuration(jobConfig.GetType()))) taskInfos, err := d.taskStore.GetTasksForJobByRange( ctx, jobID, &task.InstanceRange{ From: batch.From, To: batch.To, }) if err != nil { log.WithError(err). WithField("job_id", id). WithField("from", batch.From). WithField("to", batch.To). Error("failed to fetch task infos") if yarpcerrors.IsNotFound(err) { // Due to task_config table deprecation, we might see old jobs // fail to recover due to their task config was created in // task_config table instead of task_config_v2. Only log the // error here instead of crashing jobmgr. return } errChan <- err return } for instanceID, taskInfo := range taskInfos { d.mtx.taskMetrics.TaskRecovered.Inc(1) runtime := taskInfo.GetRuntime() // Do not add the task again if it already exists if cachedJob.GetTask(instanceID) == nil { cachedJob.ReplaceTasks( map[uint32]*task.TaskInfo{instanceID: taskInfo}, false, ) } // Do not evaluate goal state for tasks which will be evaluated using job create tasks action. if runtime.GetState() != task.TaskState_INITIALIZED || jobRuntime.GetState() != job.JobState_INITIALIZED { d.EnqueueTask(jobID, instanceID, time.Now()) } } // Recalculate the job resourceusage. Do this after recovering every task to // avoid getting into inconsistent state in case jobmgr restarts while a job // is partially recovered. if jobConfig.GetType() == job.JobType_BATCH { cachedJob.RecalculateResourceUsage(ctx) } // recover update if the job has an update, // and update is not already in cache. updateID := jobRuntime.GetUpdateID() if len(updateID.GetValue()) > 0 { d.EnqueueUpdate( jobID, updateID, time.Now().Add(d.JobRuntimeDuration(jobConfig.GetType()))) } return } // syncFromDB syncs the jobs and tasks in DB when job manager instance // gains leadership. // TODO find the right place to run recovery in job manager. func (d *driver) syncFromDB(ctx context.Context) error { log.Info("syncing cache and goal state with db") startRecoveryTime := time.Now() if err := recovery.RecoverActiveJobs( ctx, d.jobScope, d.activeJobsOps, d.jobConfigOps, d.jobRuntimeOps, d.recoverTasks, ); err != nil { return err } log.WithField("time_spent", time.Since(startRecoveryTime)). Info("syncing cache and goal state with db is finished") d.mtx.jobMetrics.JobRecoveryDuration.Update(float64(time.Since(startRecoveryTime) / time.Millisecond)) return nil } func (d *driver) Start() { for { state := d.getState() if state == starting || state == started { return } // make sure state did not change in-between if d.compareAndSwapState(state, starting) { break } } // only need to sync from DB if cache was cleaned up if d.getCacheState() == cleaned { if err := d.syncFromDB(context.Background()); err != nil { log.WithError(err). Fatal("failed to sync job manager with DB") } d.setCacheState(populated) } d.Lock() d.jobEngine.Start() d.taskEngine.Start() d.updateEngine.Start() d.Unlock() d.setState(started) log.Info("goalstate driver started") } func (d *driver) Started() bool { return d.getState() == started } func (d *driver) Stop(cleanUpCache bool) { for { // if cleanUpCache is set to true, but cache was not cleaned up, // continue the stopping process, no matter the current driver state, // because driver needs to get rid of the job factory cache if cleanUpCache && d.getCacheState() == populated { break } state := d.getState() if state == stopping || state == stopped { return } // make sure state did not change in-between if d.compareAndSwapState(state, stopping) { break } } d.Lock() d.updateEngine.Stop() d.taskEngine.Stop() d.jobEngine.Stop() d.Unlock() if cleanUpCache { d.cleanUpJobFactory() d.setCacheState(cleaned) } d.setState(stopped) log.Info("goalstate driver stopped") } // GetLockable returns the lockable interface of goal state driver, // which defines the components that can be locked. func (d *driver) GetLockable() lifecyclemgr.Lockable { return d.lm } func (d *driver) cleanUpJobFactory() { jobs := d.jobFactory.GetAllJobs() for jobID, cachedJob := range jobs { tasks := cachedJob.GetAllTasks() for instID := range tasks { d.DeleteTask(&peloton.JobID{ Value: jobID, }, instID) } d.DeleteJob(&peloton.JobID{ Value: jobID, }) workflows := cachedJob.GetAllWorkflows() for updateID := range workflows { d.DeleteUpdate(&peloton.JobID{ Value: jobID, }, &peloton.UpdateID{ Value: updateID, }) } } } // getState returns the running state of the driver func (d *driver) getState() driverState { return driverState(atomic.LoadInt32(&d.running)) } func (d *driver) compareAndSwapState(oldState driverState, newState driverState) bool { return atomic.CompareAndSwapInt32(&d.running, int32(oldState), int32(newState)) } func (d *driver) setState(state driverState) { atomic.StoreInt32(&d.running, int32(state)) } func (d *driver) setCacheState(state driverCacheState) { atomic.StoreInt32(&d.cacheState, int32(state)) } func (d *driver) getCacheState() driverCacheState { return driverCacheState(atomic.LoadInt32(&d.cacheState)) }