collector/persister.go (230 lines of code) (raw):

package collector // persist oplog on disk import ( "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" "sync" "sync/atomic" "time" conf "github.com/alibaba/MongoShake/v2/collector/configure" utils "github.com/alibaba/MongoShake/v2/common" "github.com/alibaba/MongoShake/v2/oplog" nimo "github.com/gugemichael/nimo4go" diskQueue "github.com/vinllen/go-diskqueue" LOG "github.com/vinllen/log4go" ) const ( FullSyncReaderOplogStoreDiskReadBatch = 10000 ) type Persister struct { replset string // name sync *OplogSyncer // not owned, inner call // batch data([]byte) together and send to downstream Buffer [][]byte nextQueuePosition uint64 // enable disk persist enableDiskPersist bool // stage of fetch and store oplog fetchStage int32 // disk queue used to store oplog temporarily DiskQueue *diskQueue.DiskQueue diskQueueMutex sync.Mutex // disk queue mutex diskQueueLastTs int64 // the last oplog timestamp in disk queue(full timestamp, have T + I) // metric info, used in print diskWriteCount uint64 diskReadCount uint64 } func NewPersister(replset string, sync *OplogSyncer) *Persister { p := &Persister{ replset: replset, sync: sync, Buffer: make([][]byte, 0, conf.Options.IncrSyncFetcherBufferCapacity), nextQueuePosition: 0, enableDiskPersist: conf.Options.SyncMode == utils.VarSyncModeAll && conf.Options.FullSyncReaderOplogStoreDisk, fetchStage: utils.FetchStageStoreUnknown, diskQueueLastTs: -1, // initial set 1 } return p } func (p *Persister) Start() { if p.enableDiskPersist { go p.retrieve() } } func (p *Persister) SetFetchStage(fetchStage int32) { LOG.Info("persister replset[%v] update fetch status to: %v", p.replset, utils.LogFetchStage(fetchStage)) atomic.StoreInt32(&p.fetchStage, fetchStage) } func (p *Persister) GetFetchStage() int32 { return atomic.LoadInt32(&p.fetchStage) } func (p *Persister) InitDiskQueue(dqName string) { fetchStage := p.GetFetchStage() // fetchStage shouldn't change between here if fetchStage != utils.FetchStageStoreDiskNoApply && fetchStage != utils.FetchStageStoreDiskApply { LOG.Crashf("persister replset[%v] init disk queue in illegal fetchStage %v", p.replset, utils.LogFetchStage(fetchStage)) } if p.DiskQueue != nil { LOG.Crashf("init disk queue failed: already exist") } p.DiskQueue = diskQueue.NewDiskQueue(dqName, conf.Options.LogDirectory, conf.Options.FullSyncReaderOplogStoreDiskMaxSize, FullSyncReaderOplogStoreDiskReadBatch, 1<<30, 0, 1<<26, 1000, 2*time.Second) } func (p *Persister) GetQueryTsFromDiskQueue() primitive.Timestamp { if p.DiskQueue == nil { LOG.Crashf("persister replset[%v] get query timestamp from nil disk queue", p.replset) } logData := p.DiskQueue.GetLastWriteData() if len(logData) == 0 { return primitive.Timestamp{} } if conf.Options.IncrSyncMongoFetchMethod == utils.VarIncrSyncMongoFetchMethodOplog { log := new(oplog.PartialLog) if err := bson.Unmarshal(logData, log); err != nil { LOG.Crashf("unmarshal oplog[%v] failed[%v]", logData, err) } // assert if log.Namespace == "" { LOG.Crashf("unmarshal data to oplog failed: %v", log) } return log.Timestamp } else { // change_stream log := new(oplog.Event) if err := bson.Unmarshal(logData, log); err != nil { LOG.Crashf("unmarshal oplog[%v] failed[%v]", logData, err) } // assert if log.OperationType == "" { LOG.Crashf("unmarshal data to change stream event failed: %v", log) } return log.ClusterTime } } // inject data func (p *Persister) Inject(input []byte) { // only used to test the reader, discard anything switch conf.Options.IncrSyncReaderDebug { case utils.VarIncrSyncReaderDebugNone: break case utils.VarIncrSyncReaderDebugDiscard: return case utils.VarIncrSyncReaderDebugPrint: var test interface{} bson.Unmarshal(input, &test) LOG.Info("print debug: %v", test) default: break } if p.enableDiskPersist { // current fetch stage fetchStage := p.GetFetchStage() if fetchStage == utils.FetchStageStoreMemoryApply { p.PushToPendingQueue(input) } else if p.DiskQueue != nil { if input == nil { // no need to store return } // store local // TODO unlock? p.diskQueueMutex.Lock() if p.DiskQueue != nil { // double check // should send to diskQueue atomic.AddUint64(&p.diskWriteCount, 1) if err := p.DiskQueue.Put(input); err != nil { LOG.Crashf("persister inject replset[%v] put oplog to disk queue failed[%v]", p.replset, err) } } else { // should send to pending queue p.PushToPendingQueue(input) } } else { LOG.Crashf("persister inject replset[%v] has no diskQueue with fetch stage[%v]", p.replset, utils.LogFetchStage(fetchStage)) } } else { p.PushToPendingQueue(input) } } func (p *Persister) PushToPendingQueue(input []byte) { flush := false if input != nil { p.Buffer = append(p.Buffer, input) } else { flush = true } if len(p.Buffer) >= conf.Options.IncrSyncFetcherBufferCapacity || (flush && len(p.Buffer) != 0) { // we could simply ++syncer.resolverIndex. The max uint64 is 9223372036854774807 // and discard the skip situation. we assume nextQueueCursor couldn't be overflow selected := int(p.nextQueuePosition % uint64(len(p.sync.PendingQueue))) p.sync.PendingQueue[selected] <- p.Buffer // clear old Buffer, we shouldn't use "p.Buffer = p.Buffer[:0]" because these addres won't // be changed in the channel. // p.Buffer = p.Buffer[:0] p.Buffer = make([][]byte, 0, conf.Options.IncrSyncFetcherBufferCapacity) // queue position = (queue position + 1) % n p.nextQueuePosition++ } } func (p *Persister) retrieve() { for range time.NewTicker(3 * time.Second).C { stage := atomic.LoadInt32(&p.fetchStage) switch stage { case utils.FetchStageStoreDiskApply: break case utils.FetchStageStoreUnknown: // do nothing case utils.FetchStageStoreDiskNoApply: // do nothing default: LOG.Crashf("invalid fetch stage[%v]", utils.LogFetchStage(stage)) } } LOG.Info("persister retrieve for replset[%v] begin to read from disk queue with depth[%v]", p.replset, p.DiskQueue.Depth()) ticker := time.NewTicker(time.Second) Loop: for { select { case readData := <-p.DiskQueue.ReadChan(): if len(readData) == 0 { continue } atomic.AddUint64(&p.diskReadCount, uint64(len(readData))) for _, data := range readData { p.PushToPendingQueue(data) } // move to next read if err := p.DiskQueue.Next(); err != nil { LOG.Crashf("persister replset[%v] retrieve get next failed[%v]", p.replset, err) } case <-ticker.C: // check no more data batching? if p.DiskQueue.Depth() < p.DiskQueue.BatchCount() { break Loop } } } LOG.Info("persister retrieve for replset[%v] block fetch with disk queue depth[%v]", p.replset, p.DiskQueue.Depth()) // wait to finish retrieve and continue fetch to store to memory p.diskQueueMutex.Lock() defer p.diskQueueMutex.Unlock() // lock till the end readData := p.DiskQueue.ReadAll() if len(readData) > 0 { atomic.AddUint64(&p.diskReadCount, uint64(len(readData))) for _, data := range readData { // or.oplogChan <- &retOplog{&bson.Raw{Kind: 3, Data: data}, nil} p.PushToPendingQueue(data) } // parse the last oplog timestamp p.diskQueueLastTs = utils.TimeStampToInt64(p.GetQueryTsFromDiskQueue()) if err := p.DiskQueue.Next(); err != nil { LOG.Crash(err) } } if p.DiskQueue.Depth() != 0 { LOG.Crashf("persister retrieve for replset[%v] finish, but disk queue depth[%v] is not empty", p.replset, p.DiskQueue.Depth()) } p.SetFetchStage(utils.FetchStageStoreMemoryApply) if err := p.DiskQueue.Delete(); err != nil { LOG.Critical("persister retrieve for replset[%v] close disk queue error. %v", p.replset, err) } LOG.Info("persister retriever for replset[%v] exits", p.replset) } func (p *Persister) RestAPI() { type PersistNode struct { BufferUsed int `json:"buffer_used"` BufferSize int `json:"buffer_size"` EnableDiskPersist bool `json:"enable_disk_persist"` FetchStage string `json:"fetch_stage"` DiskWriteCount uint64 `json:"disk_write_count"` DiskReadCount uint64 `json:"disk_read_count"` } utils.IncrSyncHttpApi.RegisterAPI("/persist", nimo.HttpGet, func([]byte) interface{} { return &PersistNode{ BufferSize: conf.Options.IncrSyncFetcherBufferCapacity, BufferUsed: len(p.Buffer), EnableDiskPersist: p.enableDiskPersist, FetchStage: utils.LogFetchStage(p.GetFetchStage()), DiskWriteCount: atomic.LoadUint64(&p.diskWriteCount), DiskReadCount: atomic.LoadUint64(&p.diskReadCount), } }) }