func()

in collector/batcher.go [206:267]


func (batcher *Batcher) getBatchWithDelay() ([]*oplog.GenericOplog, bool) {
	var mergeBatch []*oplog.GenericOplog
	if !batcher.utBatchesDelay.flag {
		mergeBatch = batcher.getBatch()
	} else { // for ut only
		mergeBatch = batcher.utBatchesDelay.injectBatch
	}
	if mergeBatch == nil {
		return mergeBatch, false
	}

	// judge should exit
	exitPoint := getExitPoint()
	lastOplog := mergeBatch[len(mergeBatch)-1].Parsed

	if !exitPoint.IsZero() &&
		primitive.CompareTimestamp(lastOplog.Timestamp, batcher.syncer.fullSyncFinishPosition) > 0 &&
		primitive.CompareTimestamp(exitPoint, lastOplog.Timestamp) < 0 {
		// only run detail judgement when exit point is bigger than the last one
		LOG.Info("%s exitPoint[%v] < lastOplog.Timestamp[%v]", batcher.syncer, exitPoint, lastOplog.Timestamp)
		var i int
		for i = range mergeBatch {
			// fmt.Println(exitPoint, mergeBatch[i].Parsed.Timestamp)
			if primitive.CompareTimestamp(exitPoint, mergeBatch[i].Parsed.Timestamp) < 0 {
				LOG.Info("%s exitPoint[%v] < current.Timestamp[%v]", batcher.syncer,
					exitPoint, mergeBatch[i].Parsed.Timestamp)
				break
			}
		}
		return mergeBatch[:i], true
	}

	// judge whether should delay
	delay := getTargetDelay()
	if delay > 0 {
		firstOplog := mergeBatch[0].Parsed
		// do not wait delay when oplog time less than fullSyncFinishPosition
		if primitive.CompareTimestamp(firstOplog.Timestamp, batcher.syncer.fullSyncFinishPosition) > 0 {
			for {
				// only run sleep if delay > 0
				// re-fetch delay in every round
				delay = getTargetDelay()
				delayBoundary := time.Now().Unix() - delay + 3 // 3 is for NTP drift

				if utils.ExtractMongoTimestamp(firstOplog.Timestamp) > delayBoundary {
					LOG.Info("%s --- wait target delay[%v seconds]: "+
						"first oplog timestamp[%v] > delayBoundary[%v], fullSyncFinishPosition[%v]",
						batcher.syncer, delay, firstOplog.Timestamp, delayBoundary,
						batcher.syncer.fullSyncFinishPosition)
					time.Sleep(5 * time.Second)

					// for ut only
					batcher.utBatchesDelay.delay++
				} else {
					break
				}
			}
		}
	}

	return mergeBatch, false
}