in collector/syncer.go [231:369]
func (sync *OplogSyncer) startBatcher() {
var batcher = sync.batcher
filterCheckTs := time.Now()
filterFlag := false // marks whether previous log is filter
nimo.GoRoutineInLoop(func() {
/*
* judge self is master?
*/
if !quorum.IsMaster() {
utils.YieldInMs(DurationTime)
return
}
// As much as we can batch more from logs queue. batcher can merge
// a sort of oplogs from different logs queue one by one. the max number
// of oplogs in batch is limited by AdaptiveBatchingMaxSize
batchedOplog, barrier, allEmpty, exit := batcher.BatchMore()
// it's better to handle filter in BatchMore function, but I don't want to touch this file anymore
if conf.Options.FilterOplogGids {
if err := sync.filterOplogGid(batchedOplog); err != nil {
LOG.Crash("%v", err)
}
}
var newestTs int64
if exit {
LOG.Info("%s find exit signal", sync)
// should exit now, make sure the checkpoint is updated before that
lastLog, lastFilterLog := batcher.getLastOplog()
newestTs = 1 // default is 1
if lastLog != nil && utils.TimeStampToInt64(lastLog.Timestamp) > newestTs {
newestTs = utils.TimeStampToInt64(lastLog.Timestamp)
} else if newestTs == 1 && lastFilterLog != nil {
// only set to the lastFilterLog timestamp if all before oplog filtered.
newestTs = utils.TimeStampToInt64(lastFilterLog.Timestamp)
}
if lastLog != nil && !allEmpty {
// push to worker
if worked := batcher.dispatchBatches(batchedOplog); worked {
sync.replMetric.SetLSN(newestTs)
// update latest fetched timestamp in memory
sync.reader.UpdateQueryTimestamp(newestTs)
}
}
// flush checkpoint value
sync.checkpoint(true, 0)
sync.checkCheckpointUpdate(true, newestTs) // check if need
sync.CanClose = true
LOG.Info("%s blocking and waiting exits, checkpoint: %v", sync, utils.ExtractTimestampForLog(newestTs))
select {} // block forever, wait outer routine exits
} else if log, filterLog := batcher.getLastOplog(); log != nil && !allEmpty {
// if all filtered, still update checkpoint
newestTs = utils.TimeStampToInt64(log.Timestamp)
// push to worker
if worked := batcher.dispatchBatches(batchedOplog); worked {
sync.replMetric.SetLSN(newestTs)
// update latest fetched timestamp in memory
sync.reader.UpdateQueryTimestamp(newestTs)
}
filterFlag = false
// flush checkpoint value
sync.checkpoint(barrier, 0)
sync.checkCheckpointUpdate(barrier, newestTs) // check if need
} else {
// if log is nil, check whether filterLog is empty
if filterLog == nil {
// no need to update
LOG.Debug("%s filterLog is nil", sync)
return
} else if utils.TimeStampToInt64(filterLog.Timestamp) <= sync.ckptManager.GetInMemory().Timestamp {
// no need to update
LOG.Debug("%s filterLogTs[%v] is small than ckptTs[%v], skip this filterLogTs", sync,
filterLog.Timestamp, utils.ExtractTimestampForLog(sync.ckptManager.GetInMemory().Timestamp))
return
} else {
now := time.Now()
// return if filterFlag == false
if filterFlag == false {
filterFlag = true
filterCheckTs = now
return
}
// pass only if all received oplog are filtered for {FilterCheckpointCheckInterval} seconds.
if now.After(filterCheckTs.Add(FilterCheckpointCheckInterval*time.Second)) == false {
return
}
checkpointTs := utils.ExtractMongoTimestamp(sync.ckptManager.GetInMemory().Timestamp)
filterNewestTs := utils.ExtractMongoTimestamp(filterLog.Timestamp)
if filterNewestTs-FilterCheckpointGap > checkpointTs {
// if checkpoint has not been update for {FilterCheckpointGap} seconds, update
// checkpoint mandatory.
newestTs = utils.TimeStampToInt64(filterLog.Timestamp)
LOG.Info("%s try to update checkpoint mandatory from %v to %v", sync,
utils.ExtractTimestampForLog(sync.ckptManager.GetInMemory().Timestamp),
filterLog.Timestamp)
} else {
LOG.Debug("%s filterLogTs[%v] not bigger than checkpoint[%v]",
sync, filterLog.Timestamp,
utils.ExtractTimestampForLog(sync.ckptManager.GetInMemory().Timestamp))
return
}
}
filterFlag = false
if log != nil {
newestTsLog := utils.ExtractTimestampForLog(newestTs)
if newestTs < utils.TimeStampToInt64(log.Timestamp) {
LOG.Error("%s filter newestTs[%v] smaller than previous timestamp[%v]",
sync, newestTsLog, log.Timestamp)
}
LOG.Info("%s waiting last checkpoint[%v] updated", sync, newestTsLog)
// check last checkpoint updated
status := sync.checkCheckpointUpdate(true, utils.TimeStampToInt64(log.Timestamp))
LOG.Info("%s last checkpoint[%v] updated [%v]", sync, newestTsLog, status)
} else {
LOG.Info("%s last log is empty, skip waiting checkpoint updated", sync)
}
// update latest fetched timestamp in memory
sync.reader.UpdateQueryTimestamp(newestTs)
// flush checkpoint by the newest filter oplog value
sync.checkpoint(false, newestTs)
return
}
})
}