func()

in registry/nacos/listener.go [109:163]


func (nl *nacosListener) Callback(services []model.Instance, err error) {
	if err != nil {
		logger.Errorf("nacos subscribe callback error:%s , subscribe:%+v ", err.Error(), nl.subscribeParam)
		return
	}

	addInstances := make([]model.Instance, 0, len(services))
	delInstances := make([]model.Instance, 0, len(services))
	updateInstances := make([]model.Instance, 0, len(services))
	newInstanceMap := make(map[string]model.Instance, len(services))

	nl.cacheLock.Lock()
	defer nl.cacheLock.Unlock()
	for i := range services {
		if !services[i].Enable {
			// instance is not available,so ignore it
			continue
		}
		host := services[i].Ip + ":" + strconv.Itoa(int(services[i].Port))
		instance := services[i]
		newInstanceMap[host] = instance
		if old, ok := nl.instanceMap[host]; !ok && instance.Healthy {
			// instance does not exist in cache, add it to cache
			addInstances = append(addInstances, instance)
		} else if !reflect.DeepEqual(old, instance) && instance.Healthy {
			// instance is not different from cache, update it to cache
			updateInstances = append(updateInstances, instance)
		}
	}

	for host, inst := range nl.instanceMap {
		if newInstance, ok := newInstanceMap[host]; !ok || !newInstance.Healthy {
			// cache instance does not exist in new instance list, remove it from cache
			delInstances = append(delInstances, inst)
		}
	}

	nl.instanceMap = newInstanceMap
	for i := range addInstances {
		if newUrl := generateUrl(addInstances[i]); newUrl != nil {
			nl.process(&config_center.ConfigChangeEvent{Value: newUrl, ConfigType: remoting.EventTypeAdd})
		}
	}
	for i := range delInstances {
		if newUrl := generateUrl(delInstances[i]); newUrl != nil {
			nl.process(&config_center.ConfigChangeEvent{Value: newUrl, ConfigType: remoting.EventTypeDel})
		}
	}

	for i := range updateInstances {
		if newUrl := generateUrl(updateInstances[i]); newUrl != nil {
			nl.process(&config_center.ConfigChangeEvent{Value: newUrl, ConfigType: remoting.EventTypeUpdate})
		}
	}
}