func()

in pkg/adapter/dubboregistry/registry/nacos/application_service_listener.go [76:135]


func (l *appServiceListener) Callback(services []nacosModel.SubscribeService, err error) {
	if err != nil {
		logger.Errorf("nacos subscribe callback error:%s", err.Error())
		return
	}

	addInstances := make([]nacosModel.Instance, 0, len(services))
	delInstances := make([]nacosModel.Instance, 0, len(services))
	updateInstances := make([]nacosModel.Instance, 0, len(services))
	newInstanceMap := make(map[string]nacosModel.Instance, len(services))

	l.cacheLock.Lock()
	defer l.cacheLock.Unlock()
	for i := range services {
		if !services[i].Enable {
			// instance is not available, so ignore it
			continue
		}
		host := services[i].Ip + ":" + strconv.Itoa(int(services[i].Port))
		services[i].ServiceName = handleServiceName(services[i].ServiceName)
		instance := generateInstance(services[i])
		newInstanceMap[host] = instance
		if old, ok := l.instanceMap[host]; ok {
			// instance does not exist in cache, add it to cache
			addInstances = append(addInstances, instance)
		} else {
			if !reflect.DeepEqual(old, instance) {
				// instance is not different from cache, update it to cache
				updateInstances = append(updateInstances, instance)
			}
		}
	}

	for host, inst := range l.instanceMap {
		if _, ok := newInstanceMap[host]; !ok {
			// cache instance does not exist in new instance list, remove it from cache
			delInstances = append(delInstances, inst)
		}
	}

	l.instanceMap = newInstanceMap
	for i := range addInstances {
		newURLs := l.getURLs(addInstances[i])
		for _, url := range newURLs {
			l.handle(url, remoting.EventTypeAdd)
		}
	}
	for i := range delInstances {
		newURLs := l.getURLs(delInstances[i])
		for _, url := range newURLs {
			l.handle(url, remoting.EventTypeDel)
		}
	}
	for i := range updateInstances {
		newURLs := l.getURLs(updateInstances[i])
		for _, url := range newURLs {
			l.handle(url, remoting.EventTypeUpdate)
		}
	}
}