registry/nacos/v2/watcher.go (450 lines of code) (raw):

// Copyright (c) 2022 Alibaba Group Holding Ltd. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package v2 import ( "errors" "strconv" "strings" "sync" "time" "github.com/nacos-group/nacos-sdk-go/v2/clients" "github.com/nacos-group/nacos-sdk-go/v2/clients/naming_client" "github.com/nacos-group/nacos-sdk-go/v2/common/constant" "github.com/nacos-group/nacos-sdk-go/v2/model" "github.com/nacos-group/nacos-sdk-go/v2/vo" "go.uber.org/atomic" "istio.io/api/networking/v1alpha3" "istio.io/pkg/log" apiv1 "github.com/alibaba/higress/api/networking/v1" "github.com/alibaba/higress/pkg/common" provider "github.com/alibaba/higress/registry" "github.com/alibaba/higress/registry/memory" "github.com/alibaba/higress/registry/nacos/address" ) const ( DefaultInitTimeout = time.Second * 10 DefaultNacosTimeout = 5000 DefaultNacosLogLevel = "warn" DefaultNacosLogDir = "/var/log/nacos/log/" DefaultNacosCacheDir = "/var/log/nacos/cache/" DefaultNacosNotLoadCache = true DefaultNacosLogRotateTime = "24h" DefaultNacosLogMaxAge = 3 DefaultUpdateCacheWhenEmpty = true DefaultRefreshInterval = time.Second * 30 DefaultRefreshIntervalLimit = time.Second * 10 DefaultFetchPageSize = 50 DefaultJoiner = "@@" ) type watcher struct { provider.BaseWatcher apiv1.RegistryConfig WatchingServices map[string]bool `json:"watching_services"` RegistryType provider.ServiceRegistryType `json:"registry_type"` Status provider.WatcherStatus `json:"status"` namingClient naming_client.INamingClient cache memory.Cache mutex *sync.Mutex stop chan struct{} isStop bool addrProvider *address.NacosAddressProvider updateCacheWhenEmpty bool nacosClientConfig *constant.ClientConfig authOption provider.AuthOption } type WatcherOption func(w *watcher) func NewWatcher(cache memory.Cache, opts ...WatcherOption) (provider.Watcher, error) { w := &watcher{ WatchingServices: make(map[string]bool), RegistryType: provider.Nacos2, Status: provider.UnHealthy, cache: cache, mutex: &sync.Mutex{}, stop: make(chan struct{}), } w.NacosRefreshInterval = int64(DefaultRefreshInterval) for _, opt := range opts { opt(w) } if w.NacosNamespace == "" { w.NacosNamespace = w.NacosNamespaceId } log.Infof("new nacos2 watcher with config Name:%s", w.Name) w.nacosClientConfig = constant.NewClientConfig( constant.WithTimeoutMs(DefaultNacosTimeout), constant.WithLogLevel(DefaultNacosLogLevel), constant.WithLogDir(DefaultNacosLogDir), constant.WithCacheDir(DefaultNacosCacheDir), constant.WithNotLoadCacheAtStart(DefaultNacosNotLoadCache), constant.WithLogRollingConfig(&constant.ClientLogRollingConfig{ MaxAge: DefaultNacosLogMaxAge, }), constant.WithUpdateCacheWhenEmpty(w.updateCacheWhenEmpty), constant.WithNamespaceId(w.NacosNamespaceId), constant.WithAccessKey(w.NacosAccessKey), constant.WithSecretKey(w.NacosSecretKey), constant.WithUsername(w.authOption.NacosUsername), constant.WithPassword(w.authOption.NacosPassword), ) initTimer := time.NewTimer(DefaultInitTimeout) if w.NacosAddressServer != "" { w.addrProvider = address.NewNacosAddressProvider(w.NacosAddressServer, w.NacosNamespace) w.Domain = "" select { case w.Domain = <-w.addrProvider.GetNacosAddress(w.Domain): case <-initTimer.C: return nil, errors.New("new nacos2 watcher timeout") } go w.updateNacosClient() } sc := []constant.ServerConfig{ *constant.NewServerConfig(w.Domain, uint64(w.Port)), } success := make(chan struct{}) go func() { namingClient, err := clients.NewNamingClient(vo.NacosClientParam{ ClientConfig: w.nacosClientConfig, ServerConfigs: sc, }) if err == nil { w.namingClient = namingClient close(success) } else { log.Errorf("can not create naming client, err:%v", err) } }() select { case <-initTimer.C: return nil, errors.New("new nacos2 watcher timeout") case <-success: return w, nil } } func WithNacosAddressServer(nacosAddressServer string) WatcherOption { return func(w *watcher) { w.NacosAddressServer = nacosAddressServer } } func WithNacosAccessKey(nacosAccessKey string) WatcherOption { return func(w *watcher) { w.NacosAccessKey = nacosAccessKey } } func WithNacosSecretKey(nacosSecretKey string) WatcherOption { return func(w *watcher) { w.NacosSecretKey = nacosSecretKey } } func WithNacosNamespaceId(nacosNamespaceId string) WatcherOption { return func(w *watcher) { if nacosNamespaceId == "" { w.NacosNamespaceId = "public" } else { w.NacosNamespaceId = nacosNamespaceId } } } func WithNacosNamespace(nacosNamespace string) WatcherOption { return func(w *watcher) { w.NacosNamespace = nacosNamespace } } func WithNacosGroups(nacosGroups []string) WatcherOption { return func(w *watcher) { w.NacosGroups = nacosGroups } } func WithNacosRefreshInterval(refreshInterval int64) WatcherOption { return func(w *watcher) { if refreshInterval < int64(DefaultRefreshIntervalLimit) { refreshInterval = int64(DefaultRefreshIntervalLimit) } w.NacosRefreshInterval = refreshInterval } } func WithType(t string) WatcherOption { return func(w *watcher) { w.Type = t } } func WithName(name string) WatcherOption { return func(w *watcher) { w.Name = name } } func WithDomain(domain string) WatcherOption { return func(w *watcher) { w.Domain = domain } } func WithPort(port uint32) WatcherOption { return func(w *watcher) { w.Port = port } } func WithUpdateCacheWhenEmpty(enable bool) WatcherOption { return func(w *watcher) { w.updateCacheWhenEmpty = enable } } func WithAuthOption(authOption provider.AuthOption) WatcherOption { return func(w *watcher) { w.authOption = authOption } } func (w *watcher) Run() { ticker := time.NewTicker(time.Duration(w.NacosRefreshInterval)) defer ticker.Stop() w.Status = provider.ProbeWatcherStatus(w.Domain, strconv.FormatUint(uint64(w.Port), 10)) err := w.fetchAllServices() if err != nil { log.Errorf("first fetch services failed, err:%v", err) } else { w.Ready(true) } for { select { case <-ticker.C: err := w.fetchAllServices() if err != nil { log.Errorf("fetch services failed, err:%v", err) } else { w.Ready(true) } case <-w.stop: return } } } func (w *watcher) updateNacosClient() { for { select { case addr := <-w.addrProvider.GetNacosAddress(w.Domain): func() { w.mutex.Lock() defer w.mutex.Unlock() w.Domain = addr namingClient, err := clients.NewNamingClient(vo.NacosClientParam{ ClientConfig: w.nacosClientConfig, ServerConfigs: []constant.ServerConfig{ *constant.NewServerConfig(addr, uint64(w.Port)), }, }) if err != nil { log.Errorf("can not update naming client, err:%v", err) return } w.namingClient = namingClient log.Info("naming client updated") }() case <-w.stop: return } } } func (w *watcher) fetchAllServices() error { w.mutex.Lock() defer w.mutex.Unlock() if w.isStop { return nil } fetchedServices := make(map[string]bool) var tries int for _, groupName := range w.NacosGroups { for page := 1; ; page++ { ss, err := w.namingClient.GetAllServicesInfo(vo.GetAllServiceInfoParam{ GroupName: groupName, PageNo: uint32(page), PageSize: DefaultFetchPageSize, NameSpace: w.NacosNamespace, }) if err != nil { if tries > 10 { return err } if w.addrProvider != nil { w.addrProvider.Trigger() } log.Errorf("fetch nacos service list failed, err:%v, pageNo:%d", err, page) page-- tries++ continue } for _, serviceName := range ss.Doms { fetchedServices[groupName+DefaultJoiner+serviceName] = true } if len(ss.Doms) < DefaultFetchPageSize { break } } } for key := range w.WatchingServices { if _, exist := fetchedServices[key]; !exist { s := strings.Split(key, DefaultJoiner) err := w.unsubscribe(s[0], s[1]) if err != nil { return err } delete(w.WatchingServices, key) } } wg := sync.WaitGroup{} subscribeFailed := atomic.NewBool(false) watchingKeys := make(chan string, len(fetchedServices)) for key := range fetchedServices { if _, exist := w.WatchingServices[key]; !exist { s := strings.Split(key, DefaultJoiner) if !shouldSubscribe(s[1]) { continue } wg.Add(1) go func(k string) { err := w.subscribe(s[0], s[1]) if err != nil { subscribeFailed.Store(true) log.Errorf("subscribe failed, err:%v, group:%s, service:%s", err, s[0], s[1]) } else { watchingKeys <- k } wg.Done() }(key) } } wg.Wait() close(watchingKeys) for key := range watchingKeys { w.WatchingServices[key] = true } if subscribeFailed.Load() { return errors.New("subscribe services failed") } return nil } func (w *watcher) subscribe(groupName string, serviceName string) error { log.Debugf("subscribe service, groupName:%s, serviceName:%s", groupName, serviceName) err := w.namingClient.Subscribe(&vo.SubscribeParam{ ServiceName: serviceName, GroupName: groupName, SubscribeCallback: w.getSubscribeCallback(groupName, serviceName), }) if err != nil { log.Errorf("subscribe service error:%v, groupName:%s, serviceName:%s", err, groupName, serviceName) return err } return nil } func (w *watcher) unsubscribe(groupName string, serviceName string) error { log.Debugf("unsubscribe service, groupName:%s, serviceName:%s", groupName, serviceName) err := w.namingClient.Unsubscribe(&vo.SubscribeParam{ ServiceName: serviceName, GroupName: groupName, SubscribeCallback: w.getSubscribeCallback(groupName, serviceName), }) if err != nil { log.Errorf("unsubscribe service error:%v, groupName:%s, serviceName:%s", err, groupName, serviceName) return err } return nil } func (w *watcher) getSubscribeCallback(groupName string, serviceName string) func(services []model.Instance, err error) { suffix := strings.Join([]string{groupName, w.NacosNamespace, "nacos"}, common.DotSeparator) suffix = strings.ReplaceAll(suffix, common.Underscore, common.Hyphen) host := strings.Join([]string{serviceName, suffix}, common.DotSeparator) return func(services []model.Instance, err error) { defer w.UpdateService() //log.Info("callback", "serviceName", serviceName, "suffix", suffix, "details", services) if err != nil { if strings.Contains(err.Error(), "hosts is empty") { if w.updateCacheWhenEmpty { w.cache.DeleteServiceWrapper(host) } } else { log.Errorf("callback error:%v", err) } return } if len(services) > 0 && services[0].Metadata != nil && services[0].Metadata["register-resource"] == "mcp-bridge" { return } serviceEntry := w.generateServiceEntry(host, services) w.cache.UpdateServiceWrapper(host, &memory.ServiceWrapper{ ServiceName: serviceName, ServiceEntry: serviceEntry, Suffix: suffix, RegistryType: w.Type, RegistryName: w.Name, }) } } func (w *watcher) generateServiceEntry(host string, services []model.Instance) *v1alpha3.ServiceEntry { portList := make([]*v1alpha3.ServicePort, 0) endpoints := make([]*v1alpha3.WorkloadEntry, 0) for _, service := range services { protocol := common.HTTP if service.Metadata != nil && service.Metadata["protocol"] != "" { protocol = common.ParseProtocol(service.Metadata["protocol"]) } port := &v1alpha3.ServicePort{ Name: protocol.String(), Number: uint32(service.Port), Protocol: protocol.String(), } if len(portList) == 0 { portList = append(portList, port) } endpoint := &v1alpha3.WorkloadEntry{ Address: service.Ip, Ports: map[string]uint32{port.Protocol: port.Number}, Labels: service.Metadata, } endpoints = append(endpoints, endpoint) } se := &v1alpha3.ServiceEntry{ Hosts: []string{host}, Ports: portList, Location: v1alpha3.ServiceEntry_MESH_INTERNAL, Resolution: v1alpha3.ServiceEntry_STATIC, Endpoints: endpoints, } return se } func (w *watcher) Stop() { w.mutex.Lock() defer w.mutex.Unlock() if w.addrProvider != nil { w.addrProvider.Stop() } for key := range w.WatchingServices { s := strings.Split(key, DefaultJoiner) err := w.unsubscribe(s[0], s[1]) if err == nil { delete(w.WatchingServices, key) } // clean the cache suffix := strings.Join([]string{s[0], w.NacosNamespace, "nacos"}, common.DotSeparator) suffix = strings.ReplaceAll(suffix, common.Underscore, common.Hyphen) host := strings.Join([]string{s[1], suffix}, common.DotSeparator) w.cache.DeleteServiceWrapper(host) } w.isStop = true w.namingClient.CloseClient() close(w.stop) w.Ready(false) } func (w *watcher) IsHealthy() bool { return w.Status == provider.Healthy } func (w *watcher) GetRegistryType() string { return w.RegistryType.String() } func shouldSubscribe(serviceName string) bool { prefixFilters := []string{"consumers:"} fullFilters := []string{""} for _, f := range prefixFilters { if strings.HasPrefix(serviceName, f) { return false } } for _, f := range fullFilters { if serviceName == f { return false } } return true }