in datasource/etcd/sd/servicecenter/syncer.go [111:176]
func (c *Syncer) checkWithConflictHandleFunc(local *Cacher, remote dump.Getter, skipClusters map[string]error,
conflictHandleFunc func(origin *dump.KV, conflict dump.Getter, index int)) {
exists := make(map[string]*dump.KV)
remote.ForEach(func(i int, v *dump.KV) bool {
// because the result of the remote return may contain the same data as
// the local cache of the current SC. So we need to ignore it and
// prevent the aggregation result from increasing.
if v.ClusterName == state.Configuration().ClusterName {
return true
}
if kv, ok := exists[v.Key]; ok {
conflictHandleFunc(kv, remote, i)
return true
}
exists[v.Key] = v
old := local.Cache().Get(v.Key)
newKv := &kvstore.KeyValue{
Key: util.StringToBytesWithNoCopy(v.Key),
Value: v.Value,
ModRevision: v.Rev,
ClusterName: v.ClusterName,
}
switch {
case old == nil:
newKv.Version = 1
newKv.CreateRevision = v.Rev
local.Notify(pb.EVT_CREATE, v.Key, newKv)
case old.ModRevision != v.Rev:
// if connect to some cluster failed, then skip to notify changes
// of these clusters to prevent publish the wrong changes events of kvs.
if err, ok := skipClusters[old.ClusterName]; ok {
log.Error(fmt.Sprintf("cluster[%s] temporarily unavailable, skip cluster[%s] event %s %s",
old.ClusterName, v.ClusterName, pb.EVT_UPDATE, v.Key), err)
break
}
newKv.Version = 1 + old.Version
newKv.CreateRevision = old.CreateRevision
local.Notify(pb.EVT_UPDATE, v.Key, newKv)
}
return true
})
var deletes []*kvstore.KeyValue
local.Cache().ForEach(func(key string, v *kvstore.KeyValue) (next bool) {
var exist bool
remote.ForEach(func(_ int, v *dump.KV) bool {
if v.ClusterName == state.Configuration().ClusterName {
return true
}
exist = v.Key == key
return !exist
})
if !exist {
if err, ok := skipClusters[v.ClusterName]; ok {
log.Error(fmt.Sprintf("cluster[%s] temporarily unavailable, skip event %s %s",
v.ClusterName, pb.EVT_DELETE, v.Key), err)
return true
}
deletes = append(deletes, v)
}
return true
})
for _, v := range deletes {
local.Notify(pb.EVT_DELETE, util.BytesToStringWithNoCopy(v.Key), v)
}
}