in pkg/adapter/dubboregistry/registry/nacos/service_listener.go [71:130]
func (z *serviceListener) Callback(services []nacosModel.SubscribeService, err error) {
if err != nil {
logger.Errorf("nacos subscribe callback error:%s", err.Error())
return
}
addInstances := make([]nacosModel.Instance, 0, len(services))
delInstances := make([]nacosModel.Instance, 0, len(services))
updateInstances := make([]nacosModel.Instance, 0, len(services))
newInstanceMap := make(map[string]nacosModel.Instance, len(services))
z.cacheLock.Lock()
defer z.cacheLock.Unlock()
for i := range services {
if !services[i].Enable {
// instance is not available,so ignore it
continue
}
host := services[i].Ip + ":" + strconv.Itoa(int(services[i].Port))
instance := generateInstance(services[i])
newInstanceMap[host] = instance
if old, ok := z.instanceMap[host]; !ok {
// instance does not exist in cache, add it to cache
addInstances = append(addInstances, instance)
} else {
// instance is not different from cache, update it to cache
if !reflect.DeepEqual(old, instance) {
updateInstances = append(updateInstances, instance)
}
}
}
for host, inst := range z.instanceMap {
if _, ok := newInstanceMap[host]; !ok {
// cache instance does not exist in new instance list, remove it from cache
delInstances = append(delInstances, inst)
}
}
z.instanceMap = newInstanceMap
for i := range addInstances {
newUrl := generateURL(addInstances[i])
if newUrl != nil {
z.handle(newUrl, remoting.EventTypeAdd)
}
}
for i := range delInstances {
newUrl := generateURL(delInstances[i])
if newUrl != nil {
z.handle(newUrl, remoting.EventTypeDel)
}
}
for i := range updateInstances {
newUrl := generateURL(updateInstances[i])
if newUrl != nil {
z.handle(newUrl, remoting.EventTypeUpdate)
}
}
}