in nimo-shake/incr-sync/syncer.go [593:648]
func (d *Dispatcher) ckptManager() {
var prevCkptPosition string
initCkpt, err := d.ckptWriter.Query(*d.shard.Shard.ShardId, d.ns.Collection)
if err != nil && err.Error() != utils.NotFountErr {
LOG.Crashf("%s query checkpoint failed[%v]", d.String(), err)
}
for range time.NewTicker(CheckpointFlushInterval * time.Second).C {
if d.close {
break
}
var ckpt map[string]interface{}
if d.checkpointPosition == "" {
if d.shardIt != "" {
// update shardIt
ckpt = map[string]interface{}{
checkpoint.FieldShardIt: d.shardIt,
checkpoint.FieldTimestamp: time.Now().Format(utils.GolangSecurityTime),
checkpoint.FieldApproximateTime: d.checkpointApproximateTime,
}
} else {
continue
}
} else {
if d.checkpointPosition == prevCkptPosition {
continue
}
// do not update when checkpoint < init checkpoint
if d.checkpointPosition < initCkpt.SequenceNumber {
LOG.Warn("%s current checkpoint[%v] < init checkpoint[%v], no need to update", d.String(),
d.checkpointPosition, initCkpt.SequenceNumber)
continue
}
ckpt = map[string]interface{}{
checkpoint.FieldSeqNum: d.checkpointPosition,
checkpoint.FieldIteratorType: checkpoint.IteratorTypeAtSequence,
checkpoint.FieldTimestamp: time.Now().Format(utils.GolangSecurityTime),
checkpoint.FieldApproximateTime: d.checkpointApproximateTime,
}
}
prevCkptPosition = d.checkpointPosition
err := d.ckptWriter.UpdateWithSet(*d.shard.Shard.ShardId, ckpt, d.ns.Collection)
if err != nil {
LOG.Error("%s update table[%v] shard[%v] input[%v] failed[%v]", d.String(), d.ns.Collection,
*d.shard.Shard.ShardId, ckpt, err)
} else {
LOG.Info("%s update table[%v] shard[%v] input[%v] ok", d.String(), d.ns.Collection,
*d.shard.Shard.ShardId, ckpt)
}
}
}