registry/nacos/listener.go (169 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package nacos import ( "bytes" "net/url" "reflect" "strconv" "sync" ) import ( gxchan "github.com/dubbogo/gost/container/chan" nacosClient "github.com/dubbogo/gost/database/kv/nacos" "github.com/dubbogo/gost/log/logger" "github.com/nacos-group/nacos-sdk-go/v2/model" "github.com/nacos-group/nacos-sdk-go/v2/vo" perrors "github.com/pkg/errors" ) import ( "dubbo.apache.org/dubbo-go/v3/common" "dubbo.apache.org/dubbo-go/v3/common/constant" "dubbo.apache.org/dubbo-go/v3/config_center" "dubbo.apache.org/dubbo-go/v3/registry" "dubbo.apache.org/dubbo-go/v3/remoting" ) var ( listenerCache sync.Map ) type callback func(services []model.Instance, err error) type nacosListener struct { namingClient *nacosClient.NacosNamingClient serviceName string regURL *common.URL events *gxchan.UnboundedChan instanceMap map[string]model.Instance cacheLock sync.Mutex done chan struct{} subscribeParam *vo.SubscribeParam } // NewNacosListenerWithServiceName creates a data listener for nacos func NewNacosListenerWithServiceName(serviceName string, regURL *common.URL, namingClient *nacosClient.NacosNamingClient) *nacosListener { return &nacosListener{ namingClient: namingClient, serviceName: serviceName, regURL: regURL, events: gxchan.NewUnboundedChan(32), instanceMap: map[string]model.Instance{}, done: make(chan struct{}), } } func generateUrl(instance model.Instance) *common.URL { if instance.Metadata == nil { logger.Errorf("nacos instance metadata is empty,instance:%+v", instance) return nil } path := instance.Metadata["path"] myInterface := instance.Metadata["interface"] if len(path) == 0 && len(myInterface) == 0 { logger.Errorf("nacos instance metadata does not have both path key and interface key,instance:%+v", instance) return nil } if len(path) == 0 && len(myInterface) != 0 { path = "/" + myInterface } protocol := instance.Metadata["protocol"] if len(protocol) == 0 { logger.Errorf("nacos instance metadata does not have protocol key,instance:%+v", instance) return nil } urlMap := url.Values{} for k, v := range instance.Metadata { urlMap.Set(k, v) } return common.NewURLWithOptions( common.WithIp(instance.Ip), common.WithPort(strconv.Itoa(int(instance.Port))), common.WithProtocol(protocol), common.WithParams(urlMap), common.WithPath(path), ) } // Callback will be invoked when got subscribed events. func (nl *nacosListener) Callback(services []model.Instance, err error) { if err != nil { logger.Errorf("nacos subscribe callback error:%s , subscribe:%+v ", err.Error(), nl.subscribeParam) return } addInstances := make([]model.Instance, 0, len(services)) delInstances := make([]model.Instance, 0, len(services)) updateInstances := make([]model.Instance, 0, len(services)) newInstanceMap := make(map[string]model.Instance, len(services)) nl.cacheLock.Lock() defer nl.cacheLock.Unlock() for i := range services { if !services[i].Enable { // instance is not available,so ignore it continue } host := services[i].Ip + ":" + strconv.Itoa(int(services[i].Port)) instance := services[i] newInstanceMap[host] = instance if old, ok := nl.instanceMap[host]; !ok && instance.Healthy { // instance does not exist in cache, add it to cache addInstances = append(addInstances, instance) } else if !reflect.DeepEqual(old, instance) && instance.Healthy { // instance is not different from cache, update it to cache updateInstances = append(updateInstances, instance) } } for host, inst := range nl.instanceMap { if newInstance, ok := newInstanceMap[host]; !ok || !newInstance.Healthy { // cache instance does not exist in new instance list, remove it from cache delInstances = append(delInstances, inst) } } nl.instanceMap = newInstanceMap for i := range addInstances { if newUrl := generateUrl(addInstances[i]); newUrl != nil { nl.process(&config_center.ConfigChangeEvent{Value: newUrl, ConfigType: remoting.EventTypeAdd}) } } for i := range delInstances { if newUrl := generateUrl(delInstances[i]); newUrl != nil { nl.process(&config_center.ConfigChangeEvent{Value: newUrl, ConfigType: remoting.EventTypeDel}) } } for i := range updateInstances { if newUrl := generateUrl(updateInstances[i]); newUrl != nil { nl.process(&config_center.ConfigChangeEvent{Value: newUrl, ConfigType: remoting.EventTypeUpdate}) } } } func getSubscribeName(url *common.URL) string { var buffer bytes.Buffer buffer.Write([]byte(common.DubboNodes[common.PROVIDER])) appendParam(&buffer, url, constant.InterfaceKey) appendParam(&buffer, url, constant.VersionKey) appendParam(&buffer, url, constant.GroupKey) return buffer.String() } func (nl *nacosListener) listenService(serviceName string) error { if nl.namingClient == nil { return perrors.New("nacos naming namingClient stopped") } nl.subscribeParam = createSubscribeParam(serviceName, nl.regURL, nl.Callback) if nl.subscribeParam == nil { return perrors.New("create nacos subscribeParam failed") } err := nl.namingClient.Client().Subscribe(nl.subscribeParam) if err == nil { listenerCache.Store(nl.subscribeParam.ServiceName+nl.subscribeParam.GroupName, nl) } return nil } func (nl *nacosListener) stopListen() error { return nl.namingClient.Client().Unsubscribe(nl.subscribeParam) } func (nl *nacosListener) process(configType *config_center.ConfigChangeEvent) { nl.events.In() <- configType } // Next returns the service event from nacos. func (nl *nacosListener) Next() (*registry.ServiceEvent, error) { for { select { case <-nl.done: logger.Warnf("nacos listener is close!service name:%+v", nl.serviceName) return nil, perrors.New("listener stopped") case val := <-nl.events.Out(): e, _ := val.(*config_center.ConfigChangeEvent) logger.Debugf("got nacos event %s", e) return &registry.ServiceEvent{Action: e.ConfigType, Service: e.Value.(*common.URL)}, nil } } } // nolint func (nl *nacosListener) Close() { _ = nl.stopListen() close(nl.done) }