in registry/nacos/listener.go [111:165]
func (nl *nacosListener) Callback(services []model.Instance, err error) {
if err != nil {
logger.Errorf("nacos subscribe callback error:%s , subscribe:%+v ", err.Error(), nl.subscribeParam)
return
}
addInstances := make([]model.Instance, 0, len(services))
delInstances := make([]model.Instance, 0, len(services))
updateInstances := make([]model.Instance, 0, len(services))
newInstanceMap := make(map[string]model.Instance, len(services))
nl.cacheLock.Lock()
defer nl.cacheLock.Unlock()
for i := range services {
if !services[i].Enable {
// instance is not available,so ignore it
continue
}
host := services[i].Ip + ":" + strconv.Itoa(int(services[i].Port))
instance := services[i]
newInstanceMap[host] = instance
if old, ok := nl.instanceMap[host]; !ok && instance.Healthy {
// instance does not exist in cache, add it to cache
addInstances = append(addInstances, instance)
} else if !reflect.DeepEqual(old, instance) && instance.Healthy {
// instance is not different from cache, update it to cache
updateInstances = append(updateInstances, instance)
}
}
for host, inst := range nl.instanceMap {
if newInstance, ok := newInstanceMap[host]; !ok || !newInstance.Healthy {
// cache instance does not exist in new instance list, remove it from cache
delInstances = append(delInstances, inst)
}
}
nl.instanceMap = newInstanceMap
for i := range addInstances {
if newUrl := generateUrl(addInstances[i]); newUrl != nil {
nl.process(&config_center.ConfigChangeEvent{Value: newUrl, ConfigType: remoting.EventTypeAdd})
}
}
for i := range delInstances {
if newUrl := generateUrl(delInstances[i]); newUrl != nil {
nl.process(&config_center.ConfigChangeEvent{Value: newUrl, ConfigType: remoting.EventTypeDel})
}
}
for i := range updateInstances {
if newUrl := generateUrl(updateInstances[i]); newUrl != nil {
nl.process(&config_center.ConfigChangeEvent{Value: newUrl, ConfigType: remoting.EventTypeUpdate})
}
}
}