func()

in pkg/adapter/dubboregistry/registry/nacos/service_listener.go [71:130]


func (z *serviceListener) Callback(services []nacosModel.SubscribeService, err error) {
	if err != nil {
		logger.Errorf("nacos subscribe callback error:%s", err.Error())
		return
	}

	addInstances := make([]nacosModel.Instance, 0, len(services))
	delInstances := make([]nacosModel.Instance, 0, len(services))
	updateInstances := make([]nacosModel.Instance, 0, len(services))
	newInstanceMap := make(map[string]nacosModel.Instance, len(services))

	z.cacheLock.Lock()
	defer z.cacheLock.Unlock()
	for i := range services {
		if !services[i].Enable {
			// instance is not available,so ignore it
			continue
		}
		host := services[i].Ip + ":" + strconv.Itoa(int(services[i].Port))
		instance := generateInstance(services[i])
		newInstanceMap[host] = instance
		if old, ok := z.instanceMap[host]; !ok {
			// instance does not exist in cache, add it to cache
			addInstances = append(addInstances, instance)
		} else {
			// instance is not different from cache, update it to cache
			if !reflect.DeepEqual(old, instance) {
				updateInstances = append(updateInstances, instance)
			}
		}
	}

	for host, inst := range z.instanceMap {
		if _, ok := newInstanceMap[host]; !ok {
			// cache instance does not exist in new instance list, remove it from cache
			delInstances = append(delInstances, inst)
		}
	}

	z.instanceMap = newInstanceMap
	for i := range addInstances {
		newUrl := generateURL(addInstances[i])
		if newUrl != nil {
			z.handle(newUrl, remoting.EventTypeAdd)
		}
	}
	for i := range delInstances {
		newUrl := generateURL(delInstances[i])
		if newUrl != nil {
			z.handle(newUrl, remoting.EventTypeDel)
		}
	}

	for i := range updateInstances {
		newUrl := generateURL(updateInstances[i])
		if newUrl != nil {
			z.handle(newUrl, remoting.EventTypeUpdate)
		}
	}
}