func()

in nimo-shake/incr-sync/syncer.go [593:648]


func (d *Dispatcher) ckptManager() {
	var prevCkptPosition string

	initCkpt, err := d.ckptWriter.Query(*d.shard.Shard.ShardId, d.ns.Collection)
	if err != nil && err.Error() != utils.NotFountErr {
		LOG.Crashf("%s query checkpoint failed[%v]", d.String(), err)
	}

	for range time.NewTicker(CheckpointFlushInterval * time.Second).C {
		if d.close {
			break
		}

		var ckpt map[string]interface{}
		if d.checkpointPosition == "" {
			if d.shardIt != "" {
				// update shardIt
				ckpt = map[string]interface{}{
					checkpoint.FieldShardIt:         d.shardIt,
					checkpoint.FieldTimestamp:       time.Now().Format(utils.GolangSecurityTime),
					checkpoint.FieldApproximateTime: d.checkpointApproximateTime,
				}

			} else {
				continue
			}
		} else {
			if d.checkpointPosition == prevCkptPosition {
				continue
			}
			// do not update when checkpoint < init checkpoint
			if d.checkpointPosition < initCkpt.SequenceNumber {
				LOG.Warn("%s current checkpoint[%v] < init checkpoint[%v], no need to update", d.String(),
					d.checkpointPosition, initCkpt.SequenceNumber)
				continue
			}

			ckpt = map[string]interface{}{
				checkpoint.FieldSeqNum:          d.checkpointPosition,
				checkpoint.FieldIteratorType:    checkpoint.IteratorTypeAtSequence,
				checkpoint.FieldTimestamp:       time.Now().Format(utils.GolangSecurityTime),
				checkpoint.FieldApproximateTime: d.checkpointApproximateTime,
			}
		}

		prevCkptPosition = d.checkpointPosition
		err := d.ckptWriter.UpdateWithSet(*d.shard.Shard.ShardId, ckpt, d.ns.Collection)
		if err != nil {
			LOG.Error("%s update table[%v] shard[%v] input[%v] failed[%v]", d.String(), d.ns.Collection,
				*d.shard.Shard.ShardId, ckpt, err)
		} else {
			LOG.Info("%s update table[%v] shard[%v] input[%v] ok", d.String(), d.ns.Collection,
				*d.shard.Shard.ShardId, ckpt)
		}
	}
}