func()

in collector/checkpoint.go [81:147]


func (sync *OplogSyncer) checkpoint(flush bool, inputTs int64) {
	now := time.Now()

	// do checkpoint every once in a while
	if !flush && sync.ckptTime.Add(time.Duration(conf.Options.CheckpointInterval)*time.Millisecond).After(now) {
		LOG.Debug("do not repeat update checkpoint in %v milliseconds", conf.Options.CheckpointInterval)
		return
	}
	// we force update the ckpt time even failed
	sync.ckptTime = now
	LOG.Info("checkpoint update sync.ckptTime to:%v", sync.ckptTime)

	// we delayed a few minutes to tolerate the receiver's flush buffer
	// in AckRequired() tunnel. such as "rpc". While collector is restarted,
	// we can't get the correct worker ack offset since collector have lost
	// the unack offset...
	if !flush && conf.Options.Tunnel != utils.VarTunnelDirect &&
		now.Before(sync.startTime.Add(1*time.Minute)) {
		LOG.Info("CheckpointOperation requires three minutes at least to flush receiver's buffer")
		return
	}

	// read all workerGroup self ckpt. get minimum of all updated checkpoint
	inMemoryTs := sync.ckptManager.GetInMemory().Timestamp
	var lowest int64 = 0
	var err error
	if inputTs > 0 {
		// use inputTs if inputTs is > 0
		lowest = inputTs
	} else {
		lowest, err = sync.calculateWorkerLowestCheckpoint()
	}
	LOG.Info("checkpoint func lowest:%v inMemoryTs:%v flush:%v inputTs:%v",
		utils.ExtractTimestampForLog(lowest), utils.ExtractTimestampForLog(inMemoryTs), flush, inputTs)

	lowestInt64 := lowest
	// if all oplogs from disk has been replayed successfully, store the newest oplog timestamp
	if conf.Options.FullSyncReaderOplogStoreDisk && sync.persister.diskQueueLastTs > 0 {
		if lowestInt64 >= sync.persister.diskQueueLastTs {
			sync.ckptManager.SetOplogDiskFinishTs(sync.persister.diskQueueLastTs)
			sync.persister.diskQueueLastTs = -2 // mark -1 so next time won't call
		}
	}

	if lowest > 0 && err == nil {
		switch {
		case lowestInt64 > inMemoryTs:
			if err = sync.ckptManager.Update(lowestInt64); err == nil {
				LOG.Info("CheckpointOperation write success. updated from %v to %v",
					utils.ExtractTimestampForLog(inMemoryTs), utils.ExtractTimestampForLog(lowest))
				sync.replMetric.AddCheckpoint(1)
				sync.replMetric.SetLSNCheckpoint(lowest)
				return
			}
		case lowestInt64 < inMemoryTs:
			LOG.Info("CheckpointOperation calculated[%v] is smaller than value in memory[%v]",
				utils.ExtractTimestampForLog(lowest), utils.ExtractTimestampForLog(inMemoryTs))
			return
		case lowestInt64 == inMemoryTs:
			return
		}
	}

	// this log will be print if no ack calculated
	LOG.Warn("CheckpointOperation updated is not suitable. lowest [%d]. current [%v]. inputTs [%v]. reason : %v",
		lowest, utils.ExtractTimestampForLog(inMemoryTs), inputTs, err)
}