func()

in collector/persister.go [205:279]


func (p *Persister) retrieve() {
	for range time.NewTicker(3 * time.Second).C {
		stage := atomic.LoadInt32(&p.fetchStage)
		switch stage {
		case utils.FetchStageStoreDiskApply:
			break
		case utils.FetchStageStoreUnknown:
			// do nothing
		case utils.FetchStageStoreDiskNoApply:
			// do nothing
		default:
			LOG.Crashf("invalid fetch stage[%v]", utils.LogFetchStage(stage))
		}
	}

	LOG.Info("persister retrieve for replset[%v] begin to read from disk queue with depth[%v]",
		p.replset, p.DiskQueue.Depth())
	ticker := time.NewTicker(time.Second)
Loop:
	for {
		select {
		case readData := <-p.DiskQueue.ReadChan():
			if len(readData) == 0 {
				continue
			}

			atomic.AddUint64(&p.diskReadCount, uint64(len(readData)))
			for _, data := range readData {
				p.PushToPendingQueue(data)
			}

			// move to next read
			if err := p.DiskQueue.Next(); err != nil {
				LOG.Crashf("persister replset[%v] retrieve get next failed[%v]", p.replset, err)
			}
		case <-ticker.C:
			// check no more data batching?
			if p.DiskQueue.Depth() < p.DiskQueue.BatchCount() {
				break Loop
			}
		}
	}

	LOG.Info("persister retrieve for replset[%v] block fetch with disk queue depth[%v]",
		p.replset, p.DiskQueue.Depth())

	// wait to finish retrieve and continue fetch to store to memory
	p.diskQueueMutex.Lock()
	defer p.diskQueueMutex.Unlock() // lock till the end
	readData := p.DiskQueue.ReadAll()
	if len(readData) > 0 {
		atomic.AddUint64(&p.diskReadCount, uint64(len(readData)))
		for _, data := range readData {
			// or.oplogChan <- &retOplog{&bson.Raw{Kind: 3, Data: data}, nil}
			p.PushToPendingQueue(data)
		}

		// parse the last oplog timestamp
		p.diskQueueLastTs = utils.TimeStampToInt64(p.GetQueryTsFromDiskQueue())

		if err := p.DiskQueue.Next(); err != nil {
			LOG.Crash(err)
		}
	}
	if p.DiskQueue.Depth() != 0 {
		LOG.Crashf("persister retrieve for replset[%v] finish, but disk queue depth[%v] is not empty",
			p.replset, p.DiskQueue.Depth())
	}
	p.SetFetchStage(utils.FetchStageStoreMemoryApply)

	if err := p.DiskQueue.Delete(); err != nil {
		LOG.Critical("persister retrieve for replset[%v] close disk queue error. %v", p.replset, err)
	}
	LOG.Info("persister retriever for replset[%v] exits", p.replset)
}