memstore/job_manager.go (412 lines of code) (raw):

// Copyright (c) 2017-2018 Uber Technologies, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package memstore import ( "fmt" "github.com/uber/aresdb/memstore/common" "github.com/uber/aresdb/utils" "strings" "sync" ) // JobManager is responsible for generating new jobs to run and manages job related stats. type jobManager interface { generateJobs() []Job getJobDetails() interface{} deleteTable(table string) // mutator is guaranteed to be a functor by caller(scheduler). reportJobDetail(key string, mutator jobDetailMutator) } type archiveJobManager struct { sync.RWMutex // archiveJobDetails for different tables, shard. Key is {tableName}|{shardID}|archiving, // value is the job details. jobDetails map[string]*ArchiveJobDetail // For accessing meta data like archiving delay and interval memStore *memStoreImpl scheduler *schedulerImpl } // newArchiveJobManager creates a new jobManager to manage archive jobs. func newArchiveJobManager(scheduler *schedulerImpl) jobManager { return &archiveJobManager{ jobDetails: make(map[string]*ArchiveJobDetail), memStore: scheduler.memStore, scheduler: scheduler, } } // generateJobs iterates each table shard from memStore and prepare list of archive jobs // to run. A job should start to run only when newCutoff - cutoff > interval, where // newCutoff = now - delay. func (m *archiveJobManager) generateJobs() []Job { m.memStore.RLock() defer m.memStore.RUnlock() now := uint32(utils.Now().Unix()) var jobs []Job for tableName, shardMap := range m.memStore.TableShards { for shardID, tableShard := range shardMap { tableShard.Schema.RLock() if tableShard.Schema.Schema.IsFactTable && tableShard.IsDiskDataAvailable() { interval := tableShard.Schema.Schema.Config.ArchivingIntervalMinutes * 60 delay := tableShard.Schema.Schema.Config.ArchivingDelayMinutes * 60 currentCutoff := tableShard.ArchiveStore.CurrentVersion.ArchivingCutoff newCutoff := now - delay key := getIdentifier(tableName, shardID, common.ArchivingJobType) if newCutoff > currentCutoff+interval { job := m.scheduler.NewArchivingJob(tableName, shardID, newCutoff) jobs = append(jobs, job) m.reportArchiveJobDetail(key, func(jobDetail *ArchiveJobDetail) { jobDetail.Status = JobReady jobDetail.CurrentCutoff = currentCutoff }) } else { m.reportArchiveJobDetail(key, func(jobDetail *ArchiveJobDetail) { jobDetail.Status = JobWaiting jobDetail.CurrentCutoff = currentCutoff jobDetail.NextRun = utils.TimeStampToUTC(int64(currentCutoff + delay + interval)) }) } } tableShard.Schema.RUnlock() } } return jobs } func (m *archiveJobManager) getJobDetails() interface{} { m.RLock() defer m.RUnlock() return m.jobDetails } func (m *archiveJobManager) reportJobDetail(key string, jobMutator jobDetailMutator) { m.Lock() defer m.Unlock() archiveJobDetail := m.getJobDetail(key) jobDetail := &archiveJobDetail.JobDetail jobMutator(jobDetail) } func (m *archiveJobManager) reportArchiveJobDetail(key string, jobMutator ArchiveJobDetailMutator) { m.Lock() defer m.Unlock() jobMutator(m.getJobDetail(key)) } // caller needs to hold the write lock. func (m *archiveJobManager) getJobDetail(key string) *ArchiveJobDetail { jobDetail, found := m.jobDetails[key] if !found { jobDetail = &ArchiveJobDetail{} m.jobDetails[key] = jobDetail } return jobDetail } // deleteTable deletes metadata for the table in archiveJobManager. func (m *archiveJobManager) deleteTable(table string) { m.Lock() defer m.Unlock() for key := range m.jobDetails { if strings.HasPrefix(key, table) { delete(m.jobDetails, key) } } } // ArchivingJob defines the structure that an archiving job needs. type ArchivingJob struct { // table to archive tableName string // shard to archive shardID int // new cut off cutoff uint32 // for calling archiving function in memStore memStore MemStore // for reporting job detail changes reporter ArchiveJobDetailReporter } // Run starts the archiving process and wait for it to finish. func (job *ArchivingJob) Run() error { return job.memStore.Archive(job.tableName, job.shardID, job.cutoff, job.reporter) } // GetIdentifier returns a unique identifier of this job. func (job *ArchivingJob) GetIdentifier() string { return getIdentifier(job.tableName, job.shardID, common.ArchivingJobType) } // String gives meaningful string representation for this job func (job *ArchivingJob) String() string { return fmt.Sprintf("ArchivingJob<Table: %s, ShardID: %d, Cutoff: %d>", job.tableName, job.shardID, job.cutoff) } // JobType return job type func (job *ArchivingJob) JobType() common.JobType { return common.ArchivingJobType } type backfillJobManager struct { sync.RWMutex // backfillJobDetails for different tables, shard. Key is {tableName}|{shardID}|backfill, jobDetails map[string]*BackfillJobDetail // For accessing meta data like archiving delay and interval memStore *memStoreImpl scheduler *schedulerImpl } // newBackfillJobManager creates a new jobManager to manage backfill jobs. func newBackfillJobManager(scheduler *schedulerImpl) jobManager { return &backfillJobManager{ jobDetails: make(map[string]*BackfillJobDetail), memStore: scheduler.memStore, scheduler: scheduler, } } // generateJobs iterates each table shard from memStore and prepare list of backfill jobs // to run. func (m *backfillJobManager) generateJobs() []Job { m.memStore.RLock() defer m.memStore.RUnlock() now := uint32(utils.Now().Unix()) var jobs []Job for tableName, shardMap := range m.memStore.TableShards { for shardID, tableShard := range shardMap { tableShard.Schema.RLock() if tableShard.Schema.Schema.IsFactTable && tableShard.IsDiskDataAvailable() { key := getIdentifier(tableName, shardID, common.BackfillJobType) backfillMgr := tableShard.LiveStore.BackfillManager if backfillMgr.QualifyToTriggerBackfill() { // size based strategy job := m.scheduler.NewBackfillJob(tableName, shardID) jobs = append(jobs, job) m.scheduler.reportJob(key, func(jobDetail *JobDetail) { jobDetail.Status = JobReady }) } else { // timer based strategy interval := tableShard.Schema.Schema.Config.BackfillIntervalMinutes * 60 m.Lock() jobDetail := m.getJobDetail(key) m.Unlock() // the job detail has just been initialized. if jobDetail.LastRun.Unix() <= 0 { m.scheduler.reportJob(key, func(jobDetail *JobDetail) { jobDetail.Status = JobWaiting jobDetail.LastRun = utils.TimeStampToUTC(int64(now)) }) } else if int64(now) >= jobDetail.LastRun.Unix()+int64(interval) { // enqueue backfill job job := m.scheduler.NewBackfillJob(tableName, shardID) jobs = append(jobs, job) m.scheduler.reportJob(key, func(jobDetail *JobDetail) { jobDetail.Status = JobReady }) } } } tableShard.Schema.RUnlock() } } return jobs } func (m *backfillJobManager) getJobDetails() interface{} { m.RLock() defer m.RUnlock() return m.jobDetails } func (m *backfillJobManager) reportJobDetail(key string, jobMutator jobDetailMutator) { m.Lock() defer m.Unlock() backfillJobDetail := m.getJobDetail(key) jobDetail := &backfillJobDetail.JobDetail jobMutator(jobDetail) } func (m *backfillJobManager) reportBackfillJobDetail(key string, jobMutator BackfillJobDetailMutator) { m.Lock() defer m.Unlock() jobMutator(m.getJobDetail(key)) } // caller needs to hold the write lock. func (m *backfillJobManager) getJobDetail(key string) *BackfillJobDetail { jobDetail, found := m.jobDetails[key] if !found { jobDetail = &BackfillJobDetail{} m.jobDetails[key] = jobDetail } return jobDetail } // deleteTable deletes metadata for the table in backfillJobManager. func (m *backfillJobManager) deleteTable(table string) { m.Lock() defer m.Unlock() for key := range m.jobDetails { if strings.HasPrefix(key, table) { delete(m.jobDetails, key) } } } // BackfillJob defines the structure that a backfill job needs. type BackfillJob struct { // table to backfill tableName string // shard to backfill shardID int // for calling backfill function in memStore memStore MemStore // for reporting JobDetail changes. reporter BackfillJobDetailReporter } // Run starts the backfill process and wait for it to finish. func (job *BackfillJob) Run() error { return job.memStore.Backfill(job.tableName, job.shardID, job.reporter) } // GetIdentifier returns a unique identifier of this job. func (job *BackfillJob) GetIdentifier() string { return getIdentifier(job.tableName, job.shardID, common.BackfillJobType) } // String gives meaningful string representation for this job func (job *BackfillJob) String() string { return fmt.Sprintf("BackfillJob<Table: %s, ShardID: %d>", job.tableName, job.shardID) } // JobType return job type func (job *BackfillJob) JobType() common.JobType { return common.BackfillJobType } type snapshotJobManager struct { sync.RWMutex // snapshotJobDetails for different tables, shard. Key is {tableName}|{shardID}|snapshot, jobDetails map[string]*SnapshotJobDetail // For accessing meta data like archiving delay and interval memStore *memStoreImpl scheduler *schedulerImpl } // newSnapshotJobManager creates a new jobManager to manage snapshot jobs. func newSnapshotJobManager(scheduler *schedulerImpl) jobManager { return &snapshotJobManager{ jobDetails: make(map[string]*SnapshotJobDetail), memStore: scheduler.memStore, scheduler: scheduler, } } // generateJobs iterates each table shard from memStore and prepare list of snapshot jobs // to run. func (m *snapshotJobManager) generateJobs() []Job { m.memStore.RLock() defer m.memStore.RUnlock() var jobs []Job for tableName, shardMap := range m.memStore.TableShards { for shardID, tableShard := range shardMap { tableShard.Schema.RLock() if !tableShard.Schema.Schema.IsFactTable && tableShard.IsDiskDataAvailable() { key := getIdentifier(tableName, shardID, common.SnapshotJobType) snapshotManager := tableShard.LiveStore.SnapshotManager if snapshotManager.QualifyForSnapshot() { job := m.scheduler.NewSnapshotJob(tableName, shardID) jobs = append(jobs, job) m.scheduler.reportJob(key, func(jobDetail *JobDetail) { jobDetail.Status = JobReady }) } else { jobDetail := m.getJobDetail(key) // the job detail has just been initialized. if jobDetail.LastRun.Unix() == 0 { m.scheduler.reportJob(key, func(jobDetail *JobDetail) { jobDetail.Status = JobWaiting }) } } } tableShard.Schema.RUnlock() } } return jobs } func (m *snapshotJobManager) getJobDetails() interface{} { m.RLock() defer m.RUnlock() return m.jobDetails } // deleteTable deletes metadata for the table in snapshotJobManager. func (m *snapshotJobManager) deleteTable(table string) { m.Lock() defer m.Unlock() for key := range m.jobDetails { if strings.HasPrefix(key, table) { delete(m.jobDetails, key) } } } func (m *snapshotJobManager) reportJobDetail(key string, jobMutator jobDetailMutator) { m.Lock() defer m.Unlock() snapshotJobDetail := m.getJobDetail(key) jobDetail := &snapshotJobDetail.JobDetail jobMutator(jobDetail) } func (m *snapshotJobManager) reportSnapshotJobDetail(key string, jobMutator SnapshotJobDetailMutator) { m.Lock() defer m.Unlock() jobMutator(m.getJobDetail(key)) } // caller needs to hold the write lock. func (m *snapshotJobManager) getJobDetail(key string) *SnapshotJobDetail { jobDetail, found := m.jobDetails[key] if !found { jobDetail = &SnapshotJobDetail{} m.jobDetails[key] = jobDetail } return jobDetail } // SnapshotJob defines the structure that a snapshot job needs. type SnapshotJob struct { // table to snapshot tableName string // shard to snapshot shardID int // for calling snapshot function in memStore memStore MemStore // for reporting snapshot JobDetail changes reporter SnapshotJobDetailReporter } // Run starts the snapshot process and wait for it to finish. func (job *SnapshotJob) Run() error { return job.memStore.Snapshot(job.tableName, job.shardID, job.reporter) } // GetIdentifier returns a unique identifier of this job. func (job *SnapshotJob) GetIdentifier() string { return getIdentifier(job.tableName, job.shardID, common.SnapshotJobType) } // String gives meaningful string representation for this job func (job *SnapshotJob) String() string { return fmt.Sprintf("SnapshotJob<Table: %s, ShardID: %d>", job.tableName, job.shardID) } // JobType return job type func (job *SnapshotJob) JobType() common.JobType { return common.SnapshotJobType } type purgeJobManager struct { sync.RWMutex // purge job details for different tables, shard. Key is {tableName}|{shardID}|purge, jobDetails map[string]*PurgeJobDetail memStore *memStoreImpl scheduler *schedulerImpl } // newPurgeJobManager creates a new jobManager to manage purge jobs. func newPurgeJobManager(scheduler *schedulerImpl) jobManager { return &purgeJobManager{ jobDetails: make(map[string]*PurgeJobDetail), memStore: scheduler.memStore, scheduler: scheduler, } } // generateJobs iterates each table shard from memStore and prepare list of purge jobs // to run. func (m *purgeJobManager) generateJobs() []Job { m.memStore.RLock() defer m.memStore.RUnlock() nowInDay := int(utils.Now().Unix() / 86400) var jobs []Job for tableName, shardMap := range m.memStore.TableShards { for shardID, tableShard := range shardMap { if !tableShard.IsDiskDataAvailable() { continue } retentionDays := tableShard.Schema.Schema.Config.RecordRetentionInDays key := getIdentifier(tableName, shardID, common.PurgeJobType) if tableShard.ArchiveStore.PurgeManager.QualifyForPurge() && tableShard.Schema.Schema.IsFactTable && retentionDays > 0 { batchCutOff := nowInDay - retentionDays jobs = append(jobs, m.scheduler.NewPurgeJob(tableName, shardID, 0, nowInDay-retentionDays)) m.reportPurgeJobDetail(key, func(jobDetail *PurgeJobDetail) { jobDetail.Status = JobReady jobDetail.BatchIDStart = 0 jobDetail.BatchIDEnd = batchCutOff }) } } } return jobs } func (m *purgeJobManager) getJobDetails() interface{} { m.RLock() defer m.RUnlock() return m.jobDetails } func (m *purgeJobManager) getJobDetail(key string) *PurgeJobDetail { jobDetail, found := m.jobDetails[key] if !found { jobDetail = &PurgeJobDetail{} m.jobDetails[key] = jobDetail } return jobDetail } func (m *purgeJobManager) reportJobDetail(key string, jobMutator jobDetailMutator) { m.Lock() defer m.Unlock() purgeJobDetail := m.getJobDetail(key) jobDetail := &purgeJobDetail.JobDetail jobMutator(jobDetail) } // deleteTable deletes metadata for the table in purgeJobManager. func (m *purgeJobManager) deleteTable(table string) { m.Lock() defer m.Unlock() for key := range m.jobDetails { if strings.HasPrefix(key, table) { delete(m.jobDetails, key) } } } func (m *purgeJobManager) reportPurgeJobDetail(key string, jobMutator PurgeJobDetailMutator) { m.Lock() defer m.Unlock() jobMutator(m.getJobDetail(key)) } // PurgeJob defines the structure that a purge job needs. type PurgeJob struct { tableName string shardID int // max batch id to purge batchIDStart int batchIDEnd int memStore MemStore reporter PurgeJobDetailReporter } // Run starts the purge process and wait for it to finish. func (job *PurgeJob) Run() error { return job.memStore.Purge(job.tableName, job.shardID, job.batchIDStart, job.batchIDEnd, job.reporter) } // GetIdentifier returns a unique identifier of this job. func (job *PurgeJob) GetIdentifier() string { return getIdentifier(job.tableName, job.shardID, common.PurgeJobType) } // String gives meaningful string representation for this job func (job *PurgeJob) String() string { return fmt.Sprintf("PurgeJob<Table: %s, ShardID: %d>", job.tableName, job.shardID) } // JobType return job type func (job *PurgeJob) JobType() common.JobType { return common.PurgeJobType }