pkg/jobmgr/cached/job_factory.go (292 lines of code) (raw):

// Copyright (c) 2019 Uber Technologies, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package cached import ( "sync" "time" pbjob "github.com/uber/peloton/.gen/peloton/api/v0/job" "github.com/uber/peloton/.gen/peloton/api/v0/peloton" pbtask "github.com/uber/peloton/.gen/peloton/api/v0/task" pbupdate "github.com/uber/peloton/.gen/peloton/api/v0/update" v1peloton "github.com/uber/peloton/.gen/peloton/api/v1alpha/peloton" "github.com/uber/peloton/.gen/peloton/api/v1alpha/pod" "github.com/uber/peloton/.gen/peloton/private/models" "github.com/uber/peloton/pkg/common/api" "github.com/uber/peloton/pkg/common/util" "github.com/uber/peloton/pkg/storage" ormobjects "github.com/uber/peloton/pkg/storage/objects" log "github.com/sirupsen/logrus" "github.com/uber-go/tally" ) // JobFactory is the entrypoint object into the cache which stores job and tasks. // This only runs in the job manager leader. type JobFactory interface { // AddJob will create a Job if not present in cache, // else returns the current cached Job. AddJob(id *peloton.JobID) Job // ClearJob cleans up the job from the cache. ClearJob(jobID *peloton.JobID) // GetJob will return the current cached Job, // and nil if currently not in cache. GetJob(id *peloton.JobID) Job // GetAllJobs returns the list of all jobs in cache. GetAllJobs() map[string]Job // Start emitting metrics. Start() // Stop clears the current jobs and tasks in cache, stops metrics. Stop() } type jobFactory struct { sync.RWMutex // Mutex to acquire before accessing any variables in the job factory object // map of active jobs (job identifier -> cache job object) in the system jobs map[string]*job running bool // whether job factory is running jobStore storage.JobStore // storage job store object taskStore storage.TaskStore // storage task store object updateStore storage.UpdateStore // storage update store object volumeStore storage.PersistentVolumeStore // storage volume store object activeJobsOps ormobjects.ActiveJobsOps // DB ops for active_jobs table jobIndexOps ormobjects.JobIndexOps // DB ops for job_index table jobConfigOps ormobjects.JobConfigOps // DB ops for job_config table jobRuntimeOps ormobjects.JobRuntimeOps // DB ops for job_runtime table jobNameToIDOps ormobjects.JobNameToIDOps // DB ops for job_name_to_id table jobUpdateEventsOps ormobjects.JobUpdateEventsOps // DB ops for job_update_events table taskConfigV2Ops ormobjects.TaskConfigV2Ops // DB ops for task_config_v2 table mtx *Metrics // cache metrics taskMetrics *TaskMetrics // task metrics // Job/task listeners. This list is immutable after object is created. // So it can read without a lock. listeners []JobTaskListener // channel to indicate that the job factory needs to stop stopChan chan struct{} } // InitJobFactory initializes the job factory object. func InitJobFactory( jobStore storage.JobStore, taskStore storage.TaskStore, updateStore storage.UpdateStore, volumeStore storage.PersistentVolumeStore, ormStore *ormobjects.Store, parentScope tally.Scope, listeners []JobTaskListener, ) JobFactory { return &jobFactory{ jobs: map[string]*job{}, jobStore: jobStore, taskStore: taskStore, updateStore: updateStore, volumeStore: volumeStore, activeJobsOps: ormobjects.NewActiveJobsOps(ormStore), jobIndexOps: ormobjects.NewJobIndexOps(ormStore), jobConfigOps: ormobjects.NewJobConfigOps(ormStore), jobRuntimeOps: ormobjects.NewJobRuntimeOps(ormStore), jobNameToIDOps: ormobjects.NewJobNameToIDOps(ormStore), jobUpdateEventsOps: ormobjects.NewJobUpdateEventsOps(ormStore), taskConfigV2Ops: ormobjects.NewTaskConfigV2Ops(ormStore), mtx: NewMetrics(parentScope.SubScope("cache")), taskMetrics: NewTaskMetrics(parentScope.SubScope("task")), listeners: listeners, } } func (f *jobFactory) AddJob(id *peloton.JobID) Job { if j := f.GetJob(id); j != nil { return j } f.Lock() defer f.Unlock() // check whether the job exists again, in case it // is created between RUnlock and Lock j, ok := f.jobs[id.GetValue()] if !ok { j = newJob(id, f) f.jobs[id.GetValue()] = j } return j } // ClearJob removes the job and all it tasks from inventory func (f *jobFactory) ClearJob(id *peloton.JobID) { j := f.GetJob(id) if j == nil { return } f.Lock() defer f.Unlock() delete(f.jobs, j.ID().GetValue()) } func (f *jobFactory) GetJob(id *peloton.JobID) Job { f.RLock() defer f.RUnlock() if j, ok := f.jobs[id.GetValue()]; ok { return j } return nil } func (f *jobFactory) GetAllJobs() map[string]Job { f.RLock() defer f.RUnlock() jobMap := make(map[string]Job) for k, v := range f.jobs { jobMap[k] = v } return jobMap } // Start the job factory, starts emitting metrics. func (f *jobFactory) Start() { f.Lock() defer f.Unlock() if f.running { return } f.running = true f.stopChan = make(chan struct{}) go f.runPublishMetrics(f.stopChan) log.Info("job factory started") } // Stop clears the current jobs and tasks in cache, stops emitting metrics. func (f *jobFactory) Stop() { f.Lock() defer f.Unlock() // Do not do anything if not runnning if !f.running { log.Info("job factory stopped") return } f.running = false f.jobs = map[string]*job{} close(f.stopChan) log.Info("job factory stopped") } //TODO Refactor to remove the metrics loop into a separate component. // JobFactory should only implement an interface like MetricsProvides // to periodically publish metrics instead of having its own go routine. // runPublishMetrics is the entrypoint to start and stop publishing cache metrics func (f *jobFactory) runPublishMetrics(stopChan <-chan struct{}) { ticker := time.NewTicker(_defaultMetricsUpdateTick) defer ticker.Stop() for { select { case <-ticker.C: f.publishMetrics() case <-stopChan: return } } } // publishMetrics is the routine which publishes cache metrics to M3 // return state count for test purpose func (f *jobFactory) publishMetrics() map[pbtask.TaskState]map[pbtask.TaskState]int { stopWatch := f.mtx.scope.Timer("publish_duration").Start() defer stopWatch.Stop() // Initialise tasks count map for all possible pairs of (state, goal_state) tCount := map[pbtask.TaskState]map[pbtask.TaskState]int{} for s := range pbtask.TaskState_name { tCount[pbtask.TaskState(s)] = map[pbtask.TaskState]int{} for gs := range pbtask.TaskState_name { tCount[pbtask.TaskState(s)][pbtask.TaskState(gs)] = 0 } } // Initialize update count map for all states workflowCount := map[pbupdate.State]int{} for s := range pbupdate.State_name { workflowCount[pbupdate.State(s)] = 0 } // Iterate through jobs, tasks and count jobs := f.GetAllJobs() var ( totalThrottledTasks int spreadQuotientSum float64 spreadQuotientCount int64 slaViolatedJobIDs []string ) for _, j := range jobs { unavailableInstances := uint32(0) unknownInstances := uint32(0) taskStateCount, throttledTasks, spread := j.GetTaskStateCount() for stateSummary, count := range taskStateCount { currentState := stateSummary.CurrentState goalState := stateSummary.GoalState healthState := stateSummary.HealthState tCount[currentState][goalState] += count if currentState == pbtask.TaskState_UNKNOWN || goalState == pbtask.TaskState_UNKNOWN { unknownInstances = unknownInstances + uint32(count) continue } if goalState == pbtask.TaskState_RUNNING { if currentState == pbtask.TaskState_RUNNING { switch healthState { case pbtask.HealthState_DISABLED, pbtask.HealthState_HEALTHY: continue } } unavailableInstances = unavailableInstances + uint32(count) } } totalThrottledTasks = totalThrottledTasks + throttledTasks workflowStateCount := j.GetWorkflowStateCount() for currentState, count := range workflowStateCount { workflowCount[currentState] += count } if spread.hostCount > 0 { spreadQuotientCount++ spreadQuotientSum += (float64(spread.taskCount) / float64(spread.hostCount)) } // SLA is currently defined only for stateless jobs if j.GetJobType() != pbjob.JobType_SERVICE { continue } jobConfig := j.GetCachedConfig() if jobConfig == nil { log.WithField("job_id", j.ID().GetValue()). Debug("job config not present in cache, skipping SLA metrics for job") continue } if unknownInstances > 0 { log.WithFields(log.Fields{ "job_id": j.ID().GetValue(), "num_unknown_instances": unknownInstances, }).Debug("job has instances in unknown state") } var hasActiveUpdate bool for _, w := range j.GetAllWorkflows() { if w.GetWorkflowType() == models.WorkflowType_UPDATE && IsUpdateStateActive(w.GetState().State) { hasActiveUpdate = true break } } // skip check for SLA violation if job has an ongoing update if hasActiveUpdate { continue } if jobConfig.GetSLA().GetMaximumUnavailableInstances() > 0 && unavailableInstances > jobConfig.GetSLA().GetMaximumUnavailableInstances() { log.WithField("job_id", j.ID().GetValue()). Info("job sla violated") slaViolatedJobIDs = append(slaViolatedJobIDs, j.ID().GetValue()) } } // Publish f.mtx.scope.Gauge("jobs_count").Update(float64(len(jobs))) f.mtx.scope.Gauge("sla_violated_jobs").Update(float64(len(slaViolatedJobIDs))) f.mtx.scope.Gauge("throttled_tasks").Update(float64(totalThrottledTasks)) if spreadQuotientCount > 0 { f.taskMetrics.MeanSpreadQuotient. Update(spreadQuotientSum / float64(spreadQuotientCount)) } for s, sm := range tCount { for gs, tc := range sm { f.mtx.scope.Tagged(map[string]string{"state": s.String(), "goal_state": gs.String()}).Gauge("tasks_count").Update(float64(tc)) } } for s, tc := range workflowCount { f.mtx.scope.Tagged(map[string]string{"state": s.String()}).Gauge("workflow_count").Update(float64(tc)) } return tCount } func (f *jobFactory) notifyJobSummaryChanged( jobID *peloton.JobID, jobType pbjob.JobType, jobSummary *pbjob.JobSummary, updateInfo *models.UpdateModel, ) { if jobSummary == nil { return } if jobType == pbjob.JobType_SERVICE { s := api.ConvertJobSummary(jobSummary, updateInfo) for _, l := range f.listeners { l.StatelessJobSummaryChanged(s) } // TODO add metric for listener execution latency } if jobType == pbjob.JobType_BATCH { for _, l := range f.listeners { l.BatchJobSummaryChanged(jobID, jobSummary) } // TODO add metric for listener execution latency } } func (f *jobFactory) notifyTaskRuntimeChanged( jobID *peloton.JobID, instanceID uint32, jobType pbjob.JobType, runtime *pbtask.RuntimeInfo, labels []*peloton.Label, ) { if runtime != nil { summary := &pod.PodSummary{ PodName: &v1peloton.PodName{ Value: util.CreatePelotonTaskID(jobID.GetValue(), instanceID), }, Status: api.ConvertTaskRuntimeToPodStatus(runtime), } for _, l := range f.listeners { l.PodSummaryChanged(jobType, summary, api.ConvertLabels(labels)) } // TODO add metric for listener execution latency } }