in pkg/adapter/dubboregistry/registry/nacos/application_service_listener.go [76:135]
func (l *appServiceListener) Callback(services []nacosModel.SubscribeService, err error) {
if err != nil {
logger.Errorf("nacos subscribe callback error:%s", err.Error())
return
}
addInstances := make([]nacosModel.Instance, 0, len(services))
delInstances := make([]nacosModel.Instance, 0, len(services))
updateInstances := make([]nacosModel.Instance, 0, len(services))
newInstanceMap := make(map[string]nacosModel.Instance, len(services))
l.cacheLock.Lock()
defer l.cacheLock.Unlock()
for i := range services {
if !services[i].Enable {
// instance is not available, so ignore it
continue
}
host := services[i].Ip + ":" + strconv.Itoa(int(services[i].Port))
services[i].ServiceName = handleServiceName(services[i].ServiceName)
instance := generateInstance(services[i])
newInstanceMap[host] = instance
if old, ok := l.instanceMap[host]; ok {
// instance does not exist in cache, add it to cache
addInstances = append(addInstances, instance)
} else {
if !reflect.DeepEqual(old, instance) {
// instance is not different from cache, update it to cache
updateInstances = append(updateInstances, instance)
}
}
}
for host, inst := range l.instanceMap {
if _, ok := newInstanceMap[host]; !ok {
// cache instance does not exist in new instance list, remove it from cache
delInstances = append(delInstances, inst)
}
}
l.instanceMap = newInstanceMap
for i := range addInstances {
newURLs := l.getURLs(addInstances[i])
for _, url := range newURLs {
l.handle(url, remoting.EventTypeAdd)
}
}
for i := range delInstances {
newURLs := l.getURLs(delInstances[i])
for _, url := range newURLs {
l.handle(url, remoting.EventTypeDel)
}
}
for i := range updateInstances {
newURLs := l.getURLs(updateInstances[i])
for _, url := range newURLs {
l.handle(url, remoting.EventTypeUpdate)
}
}
}