registry/nacos/watcher.go (339 lines of code) (raw):

// Copyright (c) 2022 Alibaba Group Holding Ltd. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package nacos import ( "strconv" "strings" "sync" "time" "github.com/nacos-group/nacos-sdk-go/clients" "github.com/nacos-group/nacos-sdk-go/clients/naming_client" "github.com/nacos-group/nacos-sdk-go/common/constant" "github.com/nacos-group/nacos-sdk-go/model" "github.com/nacos-group/nacos-sdk-go/vo" "istio.io/api/networking/v1alpha3" "istio.io/pkg/log" apiv1 "github.com/alibaba/higress/api/networking/v1" "github.com/alibaba/higress/pkg/common" provider "github.com/alibaba/higress/registry" "github.com/alibaba/higress/registry/memory" ) const ( DefaultNacosTimeout = 5000 DefaultNacosLogLevel = "warn" DefaultNacosLogDir = "/var/log/nacos/log/" DefaultNacosCacheDir = "/var/log/nacos/cache/" DefaultNacosNotLoadCache = true DefaultNacosLogRotateTime = "24h" DefaultNacosLogMaxAge = 3 DefaultUpdateCacheWhenEmpty = true DefaultRefreshInterval = time.Second * 30 DefaultRefreshIntervalLimit = time.Second * 10 DefaultFetchPageSize = 50 DefaultJoiner = "@@" ) type watcher struct { provider.BaseWatcher apiv1.RegistryConfig WatchingServices map[string]bool `json:"watching_services"` RegistryType provider.ServiceRegistryType `json:"registry_type"` Status provider.WatcherStatus `json:"status"` namingClient naming_client.INamingClient cache memory.Cache mutex *sync.Mutex stop chan struct{} isStop bool updateCacheWhenEmpty bool authOption provider.AuthOption } type WatcherOption func(w *watcher) func NewWatcher(cache memory.Cache, opts ...WatcherOption) (provider.Watcher, error) { w := &watcher{ WatchingServices: make(map[string]bool), RegistryType: provider.Nacos, Status: provider.UnHealthy, cache: cache, mutex: &sync.Mutex{}, stop: make(chan struct{}), } w.NacosRefreshInterval = int64(DefaultRefreshInterval) for _, opt := range opts { opt(w) } if w.NacosNamespace == "" { w.NacosNamespace = w.NacosNamespaceId } log.Infof("new nacos watcher with config Name:%s", w.Name) cc := constant.NewClientConfig( constant.WithTimeoutMs(DefaultNacosTimeout), constant.WithLogLevel(DefaultNacosLogLevel), constant.WithLogDir(DefaultNacosLogDir), constant.WithCacheDir(DefaultNacosCacheDir), constant.WithNotLoadCacheAtStart(DefaultNacosNotLoadCache), constant.WithRotateTime(DefaultNacosLogRotateTime), constant.WithMaxAge(DefaultNacosLogMaxAge), constant.WithUpdateCacheWhenEmpty(w.updateCacheWhenEmpty), constant.WithNamespaceId(w.NacosNamespaceId), ) sc := []constant.ServerConfig{ *constant.NewServerConfig(w.Domain, uint64(w.Port)), } namingClient, err := clients.NewNamingClient(vo.NacosClientParam{ ClientConfig: cc, ServerConfigs: sc, }) if err != nil { log.Errorf("can not create naming client, err:%v", err) return nil, err } w.namingClient = namingClient return w, nil } func WithNacosNamespaceId(nacosNamespaceId string) WatcherOption { return func(w *watcher) { if nacosNamespaceId == "" { w.NacosNamespaceId = "public" } else { w.NacosNamespaceId = nacosNamespaceId } } } func WithNacosNamespace(nacosNamespace string) WatcherOption { return func(w *watcher) { w.NacosNamespace = nacosNamespace } } func WithNacosGroups(nacosGroups []string) WatcherOption { return func(w *watcher) { w.NacosGroups = nacosGroups } } func WithNacosRefreshInterval(refreshInterval int64) WatcherOption { return func(w *watcher) { if refreshInterval < int64(DefaultRefreshIntervalLimit) { refreshInterval = int64(DefaultRefreshIntervalLimit) } w.NacosRefreshInterval = refreshInterval } } func WithType(t string) WatcherOption { return func(w *watcher) { w.Type = t } } func WithName(name string) WatcherOption { return func(w *watcher) { w.Name = name } } func WithDomain(domain string) WatcherOption { return func(w *watcher) { w.Domain = domain } } func WithPort(port uint32) WatcherOption { return func(w *watcher) { w.Port = port } } func WithUpdateCacheWhenEmpty(enable bool) WatcherOption { return func(w *watcher) { w.updateCacheWhenEmpty = enable } } func WithAuthOption(authOption provider.AuthOption) WatcherOption { return func(w *watcher) { w.authOption = authOption } } func (w *watcher) Run() { ticker := time.NewTicker(time.Duration(w.NacosRefreshInterval)) defer ticker.Stop() w.Status = provider.ProbeWatcherStatus(w.Domain, strconv.FormatUint(uint64(w.Port), 10)) w.fetchAllServices() w.Ready(true) for { select { case <-ticker.C: w.fetchAllServices() case <-w.stop: return } } } func (w *watcher) fetchAllServices() error { w.mutex.Lock() defer w.mutex.Unlock() if w.isStop { return nil } fetchedServices := make(map[string]bool) for _, groupName := range w.NacosGroups { for page := 1; ; page++ { ss, err := w.namingClient.GetAllServicesInfo(vo.GetAllServiceInfoParam{ GroupName: groupName, PageNo: uint32(page), PageSize: DefaultFetchPageSize, NameSpace: w.NacosNamespace, }) if err != nil { log.Errorf("fetch all services error:%v", err) break } for _, serviceName := range ss.Doms { fetchedServices[groupName+DefaultJoiner+serviceName] = true } if len(ss.Doms) < DefaultFetchPageSize { break } } } for key := range w.WatchingServices { if _, exist := fetchedServices[key]; !exist { s := strings.Split(key, DefaultJoiner) err := w.unsubscribe(s[0], s[1]) if err == nil { delete(w.WatchingServices, key) } } } for key := range fetchedServices { if _, exist := w.WatchingServices[key]; !exist { s := strings.Split(key, DefaultJoiner) if !shouldSubscribe(s[1]) { continue } err := w.subscribe(s[0], s[1]) if err == nil { w.WatchingServices[key] = true } } } return nil } func (w *watcher) subscribe(groupName string, serviceName string) error { log.Debugf("subscribe service, groupName:%s, serviceName:%s", groupName, serviceName) err := w.namingClient.Subscribe(&vo.SubscribeParam{ ServiceName: serviceName, GroupName: groupName, SubscribeCallback: w.getSubscribeCallback(groupName, serviceName), }) if err != nil { log.Errorf("subscribe service error:%v, groupName:%s, serviceName:%s", err, groupName, serviceName) return err } return nil } func (w *watcher) unsubscribe(groupName string, serviceName string) error { log.Debugf("unsubscribe service, groupName:%s, serviceName:%s", groupName, serviceName) err := w.namingClient.Unsubscribe(&vo.SubscribeParam{ ServiceName: serviceName, GroupName: groupName, SubscribeCallback: w.getSubscribeCallback(groupName, serviceName), }) if err != nil { log.Errorf("unsubscribe service error:%v, groupName:%s, serviceName:%s", err, groupName, serviceName) return err } return nil } func (w *watcher) getSubscribeCallback(groupName string, serviceName string) func(services []model.SubscribeService, err error) { suffix := strings.Join([]string{groupName, w.NacosNamespace, w.Type}, common.DotSeparator) suffix = strings.ReplaceAll(suffix, common.Underscore, common.Hyphen) host := strings.Join([]string{serviceName, suffix}, common.DotSeparator) return func(services []model.SubscribeService, err error) { defer w.UpdateService() //log.Info("callback", "serviceName", serviceName, "suffix", suffix, "details", services) if err != nil { if strings.Contains(err.Error(), "hosts is empty") { if w.updateCacheWhenEmpty { w.cache.DeleteServiceWrapper(host) } } else { log.Errorf("callback error:%v", err) } return } if len(services) > 0 && services[0].Metadata != nil && services[0].Metadata["register-resource"] == "mcp-bridge" { return } serviceEntry := w.generateServiceEntry(host, services) w.cache.UpdateServiceWrapper(host, &memory.ServiceWrapper{ ServiceName: serviceName, ServiceEntry: serviceEntry, Suffix: suffix, RegistryType: w.Type, RegistryName: w.Name, }) } } func (w *watcher) generateServiceEntry(host string, services []model.SubscribeService) *v1alpha3.ServiceEntry { portList := make([]*v1alpha3.ServicePort, 0) endpoints := make([]*v1alpha3.WorkloadEntry, 0) for _, service := range services { protocol := common.HTTP if service.Metadata != nil && service.Metadata["protocol"] != "" { protocol = common.ParseProtocol(service.Metadata["protocol"]) } else { service.Metadata = make(map[string]string) } port := &v1alpha3.ServicePort{ Name: protocol.String(), Number: uint32(service.Port), Protocol: protocol.String(), } if len(portList) == 0 { portList = append(portList, port) } endpoint := v1alpha3.WorkloadEntry{ Address: service.Ip, Ports: map[string]uint32{port.Protocol: port.Number}, Labels: service.Metadata, } endpoints = append(endpoints, &endpoint) } se := &v1alpha3.ServiceEntry{ Hosts: []string{host}, Ports: portList, Location: v1alpha3.ServiceEntry_MESH_INTERNAL, Resolution: v1alpha3.ServiceEntry_STATIC, Endpoints: endpoints, } return se } func (w *watcher) Stop() { w.mutex.Lock() defer w.mutex.Unlock() for key := range w.WatchingServices { s := strings.Split(key, DefaultJoiner) err := w.unsubscribe(s[0], s[1]) if err == nil { delete(w.WatchingServices, key) } // clean the cache suffix := strings.Join([]string{s[0], w.NacosNamespace, w.Type}, common.DotSeparator) suffix = strings.ReplaceAll(suffix, common.Underscore, common.Hyphen) host := strings.Join([]string{s[1], suffix}, common.DotSeparator) w.cache.DeleteServiceWrapper(host) } w.isStop = true close(w.stop) w.Ready(false) } func (w *watcher) IsHealthy() bool { return w.Status == provider.Healthy } func (w *watcher) GetRegistryType() string { return w.RegistryType.String() } func shouldSubscribe(serviceName string) bool { prefixFilters := []string{"consumers:"} fullFilters := []string{""} for _, f := range prefixFilters { if strings.HasPrefix(serviceName, f) { return false } } for _, f := range fullFilters { if serviceName == f { return false } } return true }