in collector/checkpoint.go [81:147]
func (sync *OplogSyncer) checkpoint(flush bool, inputTs int64) {
now := time.Now()
// do checkpoint every once in a while
if !flush && sync.ckptTime.Add(time.Duration(conf.Options.CheckpointInterval)*time.Millisecond).After(now) {
LOG.Debug("do not repeat update checkpoint in %v milliseconds", conf.Options.CheckpointInterval)
return
}
// we force update the ckpt time even failed
sync.ckptTime = now
LOG.Info("checkpoint update sync.ckptTime to:%v", sync.ckptTime)
// we delayed a few minutes to tolerate the receiver's flush buffer
// in AckRequired() tunnel. such as "rpc". While collector is restarted,
// we can't get the correct worker ack offset since collector have lost
// the unack offset...
if !flush && conf.Options.Tunnel != utils.VarTunnelDirect &&
now.Before(sync.startTime.Add(1*time.Minute)) {
LOG.Info("CheckpointOperation requires three minutes at least to flush receiver's buffer")
return
}
// read all workerGroup self ckpt. get minimum of all updated checkpoint
inMemoryTs := sync.ckptManager.GetInMemory().Timestamp
var lowest int64 = 0
var err error
if inputTs > 0 {
// use inputTs if inputTs is > 0
lowest = inputTs
} else {
lowest, err = sync.calculateWorkerLowestCheckpoint()
}
LOG.Info("checkpoint func lowest:%v inMemoryTs:%v flush:%v inputTs:%v",
utils.ExtractTimestampForLog(lowest), utils.ExtractTimestampForLog(inMemoryTs), flush, inputTs)
lowestInt64 := lowest
// if all oplogs from disk has been replayed successfully, store the newest oplog timestamp
if conf.Options.FullSyncReaderOplogStoreDisk && sync.persister.diskQueueLastTs > 0 {
if lowestInt64 >= sync.persister.diskQueueLastTs {
sync.ckptManager.SetOplogDiskFinishTs(sync.persister.diskQueueLastTs)
sync.persister.diskQueueLastTs = -2 // mark -1 so next time won't call
}
}
if lowest > 0 && err == nil {
switch {
case lowestInt64 > inMemoryTs:
if err = sync.ckptManager.Update(lowestInt64); err == nil {
LOG.Info("CheckpointOperation write success. updated from %v to %v",
utils.ExtractTimestampForLog(inMemoryTs), utils.ExtractTimestampForLog(lowest))
sync.replMetric.AddCheckpoint(1)
sync.replMetric.SetLSNCheckpoint(lowest)
return
}
case lowestInt64 < inMemoryTs:
LOG.Info("CheckpointOperation calculated[%v] is smaller than value in memory[%v]",
utils.ExtractTimestampForLog(lowest), utils.ExtractTimestampForLog(inMemoryTs))
return
case lowestInt64 == inMemoryTs:
return
}
}
// this log will be print if no ack calculated
LOG.Warn("CheckpointOperation updated is not suitable. lowest [%d]. current [%v]. inputTs [%v]. reason : %v",
lowest, utils.ExtractTimestampForLog(inMemoryTs), inputTs, err)
}