collector/checkpoint.go (138 lines of code) (raw):

package collector import ( "errors" "fmt" "sort" "sync/atomic" "time" "github.com/alibaba/MongoShake/v2/collector/ckpt" conf "github.com/alibaba/MongoShake/v2/collector/configure" utils "github.com/alibaba/MongoShake/v2/common" LOG "github.com/vinllen/log4go" ) func (sync *OplogSyncer) newCheckpointManager(name string, startPosition interface{}) { LOG.Info("Oplog sync[%v] create checkpoint manager with url[%s] table[%s.%s] start-position[%v]", name, utils.BlockMongoUrlPassword(conf.Options.CheckpointStorageUrl, "***"), conf.Options.CheckpointStorageDb, conf.Options.CheckpointStorageCollection, utils.ExtractTimestampForLog(startPosition)) if val, ok := startPosition.(int64); ok { sync.ckptManager = ckpt.NewCheckpointManager(name, val) } else { sync.ckptManager = ckpt.NewCheckpointManager(name, 0) } } /* * load checkpoint and do some checks */ func (sync *OplogSyncer) loadCheckpoint() error { checkpoint, exists, err := sync.ckptManager.Get() if err != nil { return fmt.Errorf("load checkpoint[%v] failed[%v]", sync.Replset, err) } LOG.Info("load checkpoint value: %s", checkpoint) // set fetch method if not exists or empty if !exists || checkpoint.FetchMethod == "" { sync.ckptManager.SetFetchMethod(conf.Options.IncrSyncMongoFetchMethod) } // not enable oplog persist? if !conf.Options.FullSyncReaderOplogStoreDisk { sync.persister.SetFetchStage(utils.FetchStageStoreMemoryApply) return nil } ts := time.Now() // if no checkpoint exists if !exists { sync.persister.SetFetchStage(utils.FetchStageStoreDiskNoApply) dqName := fmt.Sprintf("diskqueue-%v-%v", sync.Replset, ts.Format("20060102-150405")) sync.persister.InitDiskQueue(dqName) sync.ckptManager.SetOplogDiskQueueName(dqName) sync.ckptManager.SetOplogDiskFinishTs(ckpt.InitCheckpoint) // set as init return nil } // check if checkpoint real ts >= checkpoint disk last ts if checkpoint.OplogDiskQueueFinishTs > 0 && checkpoint.Timestamp >= checkpoint.OplogDiskQueueFinishTs { // no need to init disk queue again sync.persister.SetFetchStage(utils.FetchStageStoreMemoryApply) return nil } // TODO, there is a bug if MongoShake restarts // need to init sync.persister.SetFetchStage(utils.FetchStageStoreDiskNoApply) sync.persister.InitDiskQueue(checkpoint.OplogDiskQueue) return nil } /* * calculate and update current checkpoint value. `flush` means whether force calculate & update checkpoint. * if inputTs is given(> 0), use this value to update checkpoint, otherwise, calculate from workers. */ func (sync *OplogSyncer) checkpoint(flush bool, inputTs int64) { now := time.Now() // do checkpoint every once in a while if !flush && sync.ckptTime.Add(time.Duration(conf.Options.CheckpointInterval)*time.Millisecond).After(now) { LOG.Debug("do not repeat update checkpoint in %v milliseconds", conf.Options.CheckpointInterval) return } // we force update the ckpt time even failed sync.ckptTime = now LOG.Info("checkpoint update sync.ckptTime to:%v", sync.ckptTime) // we delayed a few minutes to tolerate the receiver's flush buffer // in AckRequired() tunnel. such as "rpc". While collector is restarted, // we can't get the correct worker ack offset since collector have lost // the unack offset... if !flush && conf.Options.Tunnel != utils.VarTunnelDirect && now.Before(sync.startTime.Add(1*time.Minute)) { LOG.Info("CheckpointOperation requires three minutes at least to flush receiver's buffer") return } // read all workerGroup self ckpt. get minimum of all updated checkpoint inMemoryTs := sync.ckptManager.GetInMemory().Timestamp var lowest int64 = 0 var err error if inputTs > 0 { // use inputTs if inputTs is > 0 lowest = inputTs } else { lowest, err = sync.calculateWorkerLowestCheckpoint() } LOG.Info("checkpoint func lowest:%v inMemoryTs:%v flush:%v inputTs:%v", utils.ExtractTimestampForLog(lowest), utils.ExtractTimestampForLog(inMemoryTs), flush, inputTs) lowestInt64 := lowest // if all oplogs from disk has been replayed successfully, store the newest oplog timestamp if conf.Options.FullSyncReaderOplogStoreDisk && sync.persister.diskQueueLastTs > 0 { if lowestInt64 >= sync.persister.diskQueueLastTs { sync.ckptManager.SetOplogDiskFinishTs(sync.persister.diskQueueLastTs) sync.persister.diskQueueLastTs = -2 // mark -1 so next time won't call } } if lowest > 0 && err == nil { switch { case lowestInt64 > inMemoryTs: if err = sync.ckptManager.Update(lowestInt64); err == nil { LOG.Info("CheckpointOperation write success. updated from %v to %v", utils.ExtractTimestampForLog(inMemoryTs), utils.ExtractTimestampForLog(lowest)) sync.replMetric.AddCheckpoint(1) sync.replMetric.SetLSNCheckpoint(lowest) return } case lowestInt64 < inMemoryTs: LOG.Info("CheckpointOperation calculated[%v] is smaller than value in memory[%v]", utils.ExtractTimestampForLog(lowest), utils.ExtractTimestampForLog(inMemoryTs)) return case lowestInt64 == inMemoryTs: return } } // this log will be print if no ack calculated LOG.Warn("CheckpointOperation updated is not suitable. lowest [%d]. current [%v]. inputTs [%v]. reason : %v", lowest, utils.ExtractTimestampForLog(inMemoryTs), inputTs, err) } func (sync *OplogSyncer) calculateWorkerLowestCheckpoint() (v int64, err error) { // no need to lock and eventually consistence is acceptable allAcked := true candidates := make([]int64, 0, len(sync.batcher.workerGroup)) allAckValues := make([]int64, 0, len(sync.batcher.workerGroup)) for _, worker := range sync.batcher.workerGroup { // read ack value first because of we don't wanna // a result of ack > unack. There wouldn't be cpu // reorder under atomic ! ack := atomic.LoadInt64(&worker.ack) unack := atomic.LoadInt64(&worker.unack) if ack == 0 && unack == 0 { // have no oplogs synced in this worker. skip } else if ack == unack || worker.IsAllAcked() { // all oplogs have been acked for right now or previous status worker.AllAcked(true) allAckValues = append(allAckValues, ack) } else if unack > ack { // most likely. partial oplogs acked (0 is possible) candidates = append(candidates, ack) allAcked = false } else if unack < ack && unack == 0 { // collector restarts. receiver unack value if from buffer // this is rarely happened. However we have delayed for // a bit log time. so we could use it allAcked = false } else if unack < ack && unack != 0 { // we should wait the bigger unack follows up the ack // they (unack and ack) will be equivalent soon ! return 0, fmt.Errorf("candidates should follow up unack[%d] ack[%d]", unack, ack) } } if allAcked && len(allAckValues) != 0 { // free to choose the maximum value. ascend order // the last one is the biggest sort.Sort(utils.Int64Slice(allAckValues)) return allAckValues[len(allAckValues)-1], nil } if len(candidates) == 0 { return 0, errors.New("no candidates ack values found") } // ascend order. first is the smallest sort.Sort(utils.Int64Slice(candidates)) if candidates[0] == 0 { return 0, errors.New("smallest candidates is zero") } LOG.Info("worker offset %v use lowest %v", candidates, utils.ExtractTimestampForLog(candidates[0])) return candidates[0], nil }