in collector/batcher.go [206:267]
func (batcher *Batcher) getBatchWithDelay() ([]*oplog.GenericOplog, bool) {
var mergeBatch []*oplog.GenericOplog
if !batcher.utBatchesDelay.flag {
mergeBatch = batcher.getBatch()
} else { // for ut only
mergeBatch = batcher.utBatchesDelay.injectBatch
}
if mergeBatch == nil {
return mergeBatch, false
}
// judge should exit
exitPoint := getExitPoint()
lastOplog := mergeBatch[len(mergeBatch)-1].Parsed
if !exitPoint.IsZero() &&
primitive.CompareTimestamp(lastOplog.Timestamp, batcher.syncer.fullSyncFinishPosition) > 0 &&
primitive.CompareTimestamp(exitPoint, lastOplog.Timestamp) < 0 {
// only run detail judgement when exit point is bigger than the last one
LOG.Info("%s exitPoint[%v] < lastOplog.Timestamp[%v]", batcher.syncer, exitPoint, lastOplog.Timestamp)
var i int
for i = range mergeBatch {
// fmt.Println(exitPoint, mergeBatch[i].Parsed.Timestamp)
if primitive.CompareTimestamp(exitPoint, mergeBatch[i].Parsed.Timestamp) < 0 {
LOG.Info("%s exitPoint[%v] < current.Timestamp[%v]", batcher.syncer,
exitPoint, mergeBatch[i].Parsed.Timestamp)
break
}
}
return mergeBatch[:i], true
}
// judge whether should delay
delay := getTargetDelay()
if delay > 0 {
firstOplog := mergeBatch[0].Parsed
// do not wait delay when oplog time less than fullSyncFinishPosition
if primitive.CompareTimestamp(firstOplog.Timestamp, batcher.syncer.fullSyncFinishPosition) > 0 {
for {
// only run sleep if delay > 0
// re-fetch delay in every round
delay = getTargetDelay()
delayBoundary := time.Now().Unix() - delay + 3 // 3 is for NTP drift
if utils.ExtractMongoTimestamp(firstOplog.Timestamp) > delayBoundary {
LOG.Info("%s --- wait target delay[%v seconds]: "+
"first oplog timestamp[%v] > delayBoundary[%v], fullSyncFinishPosition[%v]",
batcher.syncer, delay, firstOplog.Timestamp, delayBoundary,
batcher.syncer.fullSyncFinishPosition)
time.Sleep(5 * time.Second)
// for ut only
batcher.utBatchesDelay.delay++
} else {
break
}
}
}
}
return mergeBatch, false
}