func()

in collector/syncer.go [231:369]


func (sync *OplogSyncer) startBatcher() {
	var batcher = sync.batcher
	filterCheckTs := time.Now()
	filterFlag := false // marks whether previous log is filter

	nimo.GoRoutineInLoop(func() {
		/*
		 * judge self is master?
		 */
		if !quorum.IsMaster() {
			utils.YieldInMs(DurationTime)
			return
		}

		// As much as we can batch more from logs queue. batcher can merge
		// a sort of oplogs from different logs queue one by one. the max number
		// of oplogs in batch is limited by AdaptiveBatchingMaxSize
		batchedOplog, barrier, allEmpty, exit := batcher.BatchMore()

		// it's better to handle filter in BatchMore function, but I don't want to touch this file anymore
		if conf.Options.FilterOplogGids {
			if err := sync.filterOplogGid(batchedOplog); err != nil {
				LOG.Crash("%v", err)
			}
		}

		var newestTs int64
		if exit {
			LOG.Info("%s find exit signal", sync)
			// should exit now, make sure the checkpoint is updated before that
			lastLog, lastFilterLog := batcher.getLastOplog()
			newestTs = 1 // default is 1
			if lastLog != nil && utils.TimeStampToInt64(lastLog.Timestamp) > newestTs {
				newestTs = utils.TimeStampToInt64(lastLog.Timestamp)
			} else if newestTs == 1 && lastFilterLog != nil {
				// only set to the lastFilterLog timestamp if all before oplog filtered.
				newestTs = utils.TimeStampToInt64(lastFilterLog.Timestamp)
			}

			if lastLog != nil && !allEmpty {
				// push to worker
				if worked := batcher.dispatchBatches(batchedOplog); worked {
					sync.replMetric.SetLSN(newestTs)
					// update latest fetched timestamp in memory
					sync.reader.UpdateQueryTimestamp(newestTs)
				}
			}

			// flush checkpoint value
			sync.checkpoint(true, 0)
			sync.checkCheckpointUpdate(true, newestTs) // check if need
			sync.CanClose = true
			LOG.Info("%s blocking and waiting exits, checkpoint: %v", sync, utils.ExtractTimestampForLog(newestTs))
			select {} // block forever, wait outer routine exits
		} else if log, filterLog := batcher.getLastOplog(); log != nil && !allEmpty {
			// if all filtered, still update checkpoint
			newestTs = utils.TimeStampToInt64(log.Timestamp)

			// push to worker
			if worked := batcher.dispatchBatches(batchedOplog); worked {
				sync.replMetric.SetLSN(newestTs)
				// update latest fetched timestamp in memory
				sync.reader.UpdateQueryTimestamp(newestTs)
			}

			filterFlag = false

			// flush checkpoint value
			sync.checkpoint(barrier, 0)
			sync.checkCheckpointUpdate(barrier, newestTs) // check if need
		} else {
			// if log is nil, check whether filterLog is empty
			if filterLog == nil {
				// no need to update
				LOG.Debug("%s filterLog is nil", sync)
				return
			} else if utils.TimeStampToInt64(filterLog.Timestamp) <= sync.ckptManager.GetInMemory().Timestamp {
				// no need to update
				LOG.Debug("%s filterLogTs[%v] is small than ckptTs[%v], skip this filterLogTs", sync,
					filterLog.Timestamp, utils.ExtractTimestampForLog(sync.ckptManager.GetInMemory().Timestamp))
				return
			} else {
				now := time.Now()

				// return if filterFlag == false
				if filterFlag == false {
					filterFlag = true
					filterCheckTs = now
					return
				}

				// pass only if all received oplog are filtered for {FilterCheckpointCheckInterval} seconds.
				if now.After(filterCheckTs.Add(FilterCheckpointCheckInterval*time.Second)) == false {
					return
				}

				checkpointTs := utils.ExtractMongoTimestamp(sync.ckptManager.GetInMemory().Timestamp)
				filterNewestTs := utils.ExtractMongoTimestamp(filterLog.Timestamp)
				if filterNewestTs-FilterCheckpointGap > checkpointTs {
					// if checkpoint has not been update for {FilterCheckpointGap} seconds, update
					// checkpoint mandatory.
					newestTs = utils.TimeStampToInt64(filterLog.Timestamp)
					LOG.Info("%s try to update checkpoint mandatory from %v to %v", sync,
						utils.ExtractTimestampForLog(sync.ckptManager.GetInMemory().Timestamp),
						filterLog.Timestamp)
				} else {
					LOG.Debug("%s filterLogTs[%v] not bigger than checkpoint[%v]",
						sync, filterLog.Timestamp,
						utils.ExtractTimestampForLog(sync.ckptManager.GetInMemory().Timestamp))
					return
				}
			}

			filterFlag = false

			if log != nil {
				newestTsLog := utils.ExtractTimestampForLog(newestTs)
				if newestTs < utils.TimeStampToInt64(log.Timestamp) {
					LOG.Error("%s filter newestTs[%v] smaller than previous timestamp[%v]",
						sync, newestTsLog, log.Timestamp)
				}

				LOG.Info("%s waiting last checkpoint[%v] updated", sync, newestTsLog)
				// check last checkpoint updated

				status := sync.checkCheckpointUpdate(true, utils.TimeStampToInt64(log.Timestamp))
				LOG.Info("%s last checkpoint[%v] updated [%v]", sync, newestTsLog, status)
			} else {
				LOG.Info("%s last log is empty, skip waiting checkpoint updated", sync)
			}

			// update latest fetched timestamp in memory
			sync.reader.UpdateQueryTimestamp(newestTs)
			// flush checkpoint by the newest filter oplog value
			sync.checkpoint(false, newestTs)
			return
		}
	})
}