memstore/scheduler.go (221 lines of code) (raw):

// Copyright (c) 2017-2018 Uber Technologies, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package memstore import ( "fmt" "sync" "time" "strings" "github.com/uber/aresdb/memstore/common" "github.com/uber/aresdb/utils" ) const ( // interval for scheduler schedulerInterval = time.Minute ) // jobBundle binds a result channel together with the job. // Listening on the result channel will block until job finishes. // Nil error indicates job runs successfully. type jobBundle struct { Job resChan chan error } // Scheduler is for scheduling archiving jobs (and later backfill jobs) for table shards // in memStore. It scans through all tables and shards to generate list of eligible jobs // to run. type Scheduler interface { Start() Stop() SubmitJob(job Job) (error, chan error) DeleteTable(table string, isFactTable bool) GetJobDetails(jobType common.JobType) interface{} NewBackfillJob(tableName string, shardID int) Job NewArchivingJob(tableName string, shardID int, cutoff uint32) Job NewSnapshotJob(tableName string, shardID int) Job NewPurgeJob(tableName string, shardID int, batchIDStart int, batchIDEnd int) Job EnableJobType(jobType common.JobType, enable bool) IsJobTypeEnabled(jobType common.JobType) bool utils.RWLocker } // newScheduler returns a new Scheduler. func newScheduler(m *memStoreImpl) *schedulerImpl { s := &schedulerImpl{ memStore: m, schedulerStopChan: make(chan struct{}), jobBundleChan: make(chan jobBundle), executorStopChan: make(chan struct{}), jobManagers: make(map[common.JobType]jobManager), jobEnableFlags: make(map[common.JobType]bool), } s.jobManagers[common.ArchivingJobType] = newArchiveJobManager(s) s.jobManagers[common.BackfillJobType] = newBackfillJobManager(s) s.jobManagers[common.SnapshotJobType] = newSnapshotJobManager(s) s.jobManagers[common.PurgeJobType] = newPurgeJobManager(s) return s } // schedulerImpl is the implementation of Scheduler interface. type schedulerImpl struct { // Protecting JobRunning. sync.RWMutex // For accessing meta data like archiving delay and interval memStore *memStoreImpl // Stop main scheduler loop. schedulerStopChan chan struct{} // Channel for executing job. jobBundleChan chan jobBundle // Stop executor loop. executorStopChan chan struct{} jobManagers map[common.JobType]jobManager jobEnableFlags map[common.JobType]bool archivingStarted bool } func (scheduler *schedulerImpl) EnableJobType(jobType common.JobType, enable bool) { scheduler.Lock() scheduler.jobEnableFlags[jobType] = enable scheduler.Unlock() } func (scheduler *schedulerImpl) IsJobTypeEnabled(jobType common.JobType) bool { scheduler.RLock() defer scheduler.RUnlock() enabled, ok := scheduler.jobEnableFlags[jobType] return !ok || enabled } func (scheduler *schedulerImpl) reportJob(key string, mutator jobDetailMutator) { scheduler.Lock() defer scheduler.Unlock() comps := strings.SplitN(key, "|", 3) if len(comps) < 3 { return } if jobManager, ok := scheduler.jobManagers[common.JobType(comps[2])]; ok { jobManager.reportJobDetail(key, mutator) } } // getIdentifier returns a unique identifier from table, shard and job type. func getIdentifier(tableName string, shardID int, jobType common.JobType) string { return fmt.Sprintf("%s|%d|%s", tableName, shardID, jobType) } // GetJobDetails returns corresponding job details for given job type. func (scheduler *schedulerImpl) GetJobDetails(jobType common.JobType) interface{} { if jobManager, ok := scheduler.jobManagers[jobType]; ok { return jobManager.getJobDetails() } return nil } // DeleteTable deletes the job details of a table given its name and whether it's a fact table. func (scheduler *schedulerImpl) DeleteTable(table string, isFactTable bool) { if isFactTable { scheduler.jobManagers[common.ArchivingJobType].deleteTable(table) scheduler.jobManagers[common.BackfillJobType].deleteTable(table) scheduler.jobManagers[common.PurgeJobType].deleteTable(table) return } scheduler.jobManagers[common.SnapshotJobType].deleteTable(table) } // GetJobManager retrieve the JobManager according to job type func (scheduler *schedulerImpl) GetJobManager(jobType common.JobType) jobManager { return scheduler.jobManagers[jobType] } // NewArchivingJob returns a new ArchivingJob. func (scheduler *schedulerImpl) NewArchivingJob(tableName string, shardID int, cutoff uint32) Job { return &ArchivingJob{ tableName: tableName, shardID: shardID, cutoff: cutoff, memStore: scheduler.memStore, reporter: scheduler.jobManagers[common.ArchivingJobType].(*archiveJobManager).reportArchiveJobDetail, } } // NewBackfillJob returns a new BackfillJob. func (scheduler *schedulerImpl) NewBackfillJob(tableName string, shardID int) Job { return &BackfillJob{ tableName: tableName, shardID: shardID, memStore: scheduler.memStore, reporter: scheduler.jobManagers[common.BackfillJobType].(*backfillJobManager).reportBackfillJobDetail, } } // NewSnapshotJob returns a new SnapshotJob. func (scheduler *schedulerImpl) NewSnapshotJob(tableName string, shardID int) Job { return &SnapshotJob{ tableName: tableName, shardID: shardID, memStore: scheduler.memStore, reporter: scheduler.jobManagers[common.SnapshotJobType].(*snapshotJobManager).reportSnapshotJobDetail, } } // NewPurgeJob returns a new PurgeJob func (scheduler *schedulerImpl) NewPurgeJob(tableName string, shardID, batchIDStart, batchIDEnd int) Job { return &PurgeJob{ tableName: tableName, shardID: shardID, batchIDStart: batchIDStart, batchIDEnd: batchIDEnd, memStore: scheduler.memStore, reporter: scheduler.jobManagers[common.PurgeJobType].(*purgeJobManager).reportPurgeJobDetail, } } // Start starts the scheduler. It creates a new time.Timer every time to wait // at least schedulerInterval time instead of running at every tick so that we // will skip the tick if a single round takes more than one minute. This prevents // accessing memStore (and lock) too many times during a short period. func (scheduler *schedulerImpl) Start() { timer := time.NewTimer(schedulerInterval) // Scheduler loop. go func() { for { select { case <-timer.C: scheduler.run() // Since we already receive the event from channel, // there is no need to stop it and we can directly reset the timer. timer.Reset(schedulerInterval) case <-scheduler.schedulerStopChan: // It will block on waiting for executor to stop. scheduler.executorStopChan <- struct{}{} return } } }() // Executor loop. go func() { for { select { case jobBundle := <-scheduler.jobBundleChan: job := jobBundle.Job utils.GetLogger().With("job", job).Info("Received job") scheduler.executeJob(&jobBundle) case <-scheduler.executorStopChan: return } } }() } func (scheduler *schedulerImpl) executeJob(jb *jobBundle) { job := jb.Job utils.GetLogger().With("job", job).Info("Running job") scheduler.reportJob(job.GetIdentifier(), func(jobDetail *JobDetail) { jobDetail.Status = JobRunning jobDetail.LastStartTime = utils.Now().UTC() }) err := jb.Run() // Set job status according to the result. now := uint32(utils.Now().Unix()) if err != nil { utils.GetLogger().With("error", err, "job", job).Error("Failed to run job due to error") scheduler.reportJob(job.GetIdentifier(), func(jobDetail *JobDetail) { jobDetail.LastError = err jobDetail.Status = JobFailed jobDetail.LastRun = utils.TimeStampToUTC(int64(now)) }) } else { utils.GetLogger().With("job", job).Info("Succeeded to run job") scheduler.reportJob(job.GetIdentifier(), func(jobDetail *JobDetail) { jobDetail.LastError = nil jobDetail.Status = JobSucceeded jobDetail.LastRun = utils.TimeStampToUTC(int64(now)) }) } // This is a non-blocking channel sending. jb.resChan <- err } // Stop stops the scheduler. func (scheduler *schedulerImpl) Stop() { scheduler.schedulerStopChan <- struct{}{} } // SubmitJob will submit a job to executor and block until it starts. // Job submitter can decide whether to wait for job to finish and get // the result. func (scheduler *schedulerImpl) SubmitJob(job Job) (error, chan error) { if !scheduler.IsJobTypeEnabled(job.JobType()) { // this check is to block request from debug handler return fmt.Errorf("JobType %s disabled", job.JobType()), nil } jb := jobBundle{job, make(chan error, 1)} scheduler.jobBundleChan <- jb utils.GetLogger().With("job", job).Info("Submitted job") return nil, jb.resChan } // run runs at every tick. It first generates a list of jobs to run based on current condition, // then it runs every job sequentially in the same process. func (scheduler *schedulerImpl) run() { for jobType, jobManager := range scheduler.jobManagers { if !scheduler.IsJobTypeEnabled(jobType) { continue } for _, job := range jobManager.generateJobs() { // Waiting for job to finish. err, errChan := scheduler.SubmitJob(job) if err == nil { if err := <-errChan; err != nil { utils.GetLogger().With("job", job).Panic("Panic due to failure to run job") } } else { utils.GetLogger().With("job", job).Error("Fail to submit job") } } } } // Job defines the common interface for BackfillJob, ArchivingJob and SnapshotJob type Job interface { JobType() common.JobType Run() error GetIdentifier() string String() string }