in collector/persister.go [205:279]
func (p *Persister) retrieve() {
for range time.NewTicker(3 * time.Second).C {
stage := atomic.LoadInt32(&p.fetchStage)
switch stage {
case utils.FetchStageStoreDiskApply:
break
case utils.FetchStageStoreUnknown:
// do nothing
case utils.FetchStageStoreDiskNoApply:
// do nothing
default:
LOG.Crashf("invalid fetch stage[%v]", utils.LogFetchStage(stage))
}
}
LOG.Info("persister retrieve for replset[%v] begin to read from disk queue with depth[%v]",
p.replset, p.DiskQueue.Depth())
ticker := time.NewTicker(time.Second)
Loop:
for {
select {
case readData := <-p.DiskQueue.ReadChan():
if len(readData) == 0 {
continue
}
atomic.AddUint64(&p.diskReadCount, uint64(len(readData)))
for _, data := range readData {
p.PushToPendingQueue(data)
}
// move to next read
if err := p.DiskQueue.Next(); err != nil {
LOG.Crashf("persister replset[%v] retrieve get next failed[%v]", p.replset, err)
}
case <-ticker.C:
// check no more data batching?
if p.DiskQueue.Depth() < p.DiskQueue.BatchCount() {
break Loop
}
}
}
LOG.Info("persister retrieve for replset[%v] block fetch with disk queue depth[%v]",
p.replset, p.DiskQueue.Depth())
// wait to finish retrieve and continue fetch to store to memory
p.diskQueueMutex.Lock()
defer p.diskQueueMutex.Unlock() // lock till the end
readData := p.DiskQueue.ReadAll()
if len(readData) > 0 {
atomic.AddUint64(&p.diskReadCount, uint64(len(readData)))
for _, data := range readData {
// or.oplogChan <- &retOplog{&bson.Raw{Kind: 3, Data: data}, nil}
p.PushToPendingQueue(data)
}
// parse the last oplog timestamp
p.diskQueueLastTs = utils.TimeStampToInt64(p.GetQueryTsFromDiskQueue())
if err := p.DiskQueue.Next(); err != nil {
LOG.Crash(err)
}
}
if p.DiskQueue.Depth() != 0 {
LOG.Crashf("persister retrieve for replset[%v] finish, but disk queue depth[%v] is not empty",
p.replset, p.DiskQueue.Depth())
}
p.SetFetchStage(utils.FetchStageStoreMemoryApply)
if err := p.DiskQueue.Delete(); err != nil {
LOG.Critical("persister retrieve for replset[%v] close disk queue error. %v", p.replset, err)
}
LOG.Info("persister retriever for replset[%v] exits", p.replset)
}