func()

in datasource/etcd/sd/servicecenter/syncer.go [111:176]


func (c *Syncer) checkWithConflictHandleFunc(local *Cacher, remote dump.Getter, skipClusters map[string]error,
	conflictHandleFunc func(origin *dump.KV, conflict dump.Getter, index int)) {
	exists := make(map[string]*dump.KV)
	remote.ForEach(func(i int, v *dump.KV) bool {
		// because the result of the remote return may contain the same data as
		// the local cache of the current SC. So we need to ignore it and
		// prevent the aggregation result from increasing.
		if v.ClusterName == state.Configuration().ClusterName {
			return true
		}
		if kv, ok := exists[v.Key]; ok {
			conflictHandleFunc(kv, remote, i)
			return true
		}
		exists[v.Key] = v
		old := local.Cache().Get(v.Key)
		newKv := &kvstore.KeyValue{
			Key:         util.StringToBytesWithNoCopy(v.Key),
			Value:       v.Value,
			ModRevision: v.Rev,
			ClusterName: v.ClusterName,
		}
		switch {
		case old == nil:
			newKv.Version = 1
			newKv.CreateRevision = v.Rev
			local.Notify(pb.EVT_CREATE, v.Key, newKv)
		case old.ModRevision != v.Rev:
			// if connect to some cluster failed, then skip to notify changes
			// of these clusters to prevent publish the wrong changes events of kvs.
			if err, ok := skipClusters[old.ClusterName]; ok {
				log.Error(fmt.Sprintf("cluster[%s] temporarily unavailable, skip cluster[%s] event %s %s",
					old.ClusterName, v.ClusterName, pb.EVT_UPDATE, v.Key), err)
				break
			}
			newKv.Version = 1 + old.Version
			newKv.CreateRevision = old.CreateRevision
			local.Notify(pb.EVT_UPDATE, v.Key, newKv)
		}
		return true
	})

	var deletes []*kvstore.KeyValue
	local.Cache().ForEach(func(key string, v *kvstore.KeyValue) (next bool) {
		var exist bool
		remote.ForEach(func(_ int, v *dump.KV) bool {
			if v.ClusterName == state.Configuration().ClusterName {
				return true
			}
			exist = v.Key == key
			return !exist
		})
		if !exist {
			if err, ok := skipClusters[v.ClusterName]; ok {
				log.Error(fmt.Sprintf("cluster[%s] temporarily unavailable, skip event %s %s",
					v.ClusterName, pb.EVT_DELETE, v.Key), err)
				return true
			}
			deletes = append(deletes, v)
		}
		return true
	})
	for _, v := range deletes {
		local.Notify(pb.EVT_DELETE, util.BytesToStringWithNoCopy(v.Key), v)
	}
}