registry/consul/watcher.go (295 lines of code) (raw):

// Copyright (c) 2022 Alibaba Group Holding Ltd. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package consul import ( "strconv" "strings" "sync" "time" apiv1 "github.com/alibaba/higress/api/networking/v1" "github.com/alibaba/higress/pkg/common" provider "github.com/alibaba/higress/registry" "github.com/alibaba/higress/registry/memory" consulapi "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/api/watch" "istio.io/api/networking/v1alpha3" "istio.io/pkg/log" ) const ( ConsulHealthPassing = "passing" DefaultRefreshInterval = time.Second * 30 DefaultRefreshIntervalLimit = time.Second * 10 ) type watcher struct { provider.BaseWatcher apiv1.RegistryConfig serverAddress string consulClient *consulapi.Client consulCatalog *consulapi.Catalog WatchingServices map[string]bool watchers map[string]*watch.Plan RegistryType provider.ServiceRegistryType Status provider.WatcherStatus cache memory.Cache mutex *sync.Mutex stop chan struct{} isStop bool updateCacheWhenEmpty bool authOption provider.AuthOption } type WatcherOption func(w *watcher) func WithType(t string) WatcherOption { return func(w *watcher) { w.Type = t } } func WithName(name string) WatcherOption { return func(w *watcher) { w.Name = name } } func WithDomain(domain string) WatcherOption { return func(w *watcher) { w.Domain = domain } } func WithPort(port uint32) WatcherOption { return func(w *watcher) { w.Port = port } } func WithDatacenter(dataCenter string) WatcherOption { return func(w *watcher) { w.ConsulDatacenter = dataCenter } } func WithAuthOption(authOption provider.AuthOption) WatcherOption { return func(w *watcher) { w.authOption = authOption } } func WithServiceTag(serviceTag string) WatcherOption { return func(w *watcher) { w.ConsulServiceTag = strings.ToLower(strings.TrimSpace(serviceTag)) } } func WithRefreshInterval(refreshInterval int64) WatcherOption { return func(w *watcher) { if refreshInterval < int64(DefaultRefreshIntervalLimit) { refreshInterval = int64(DefaultRefreshIntervalLimit) } w.ConsulRefreshInterval = refreshInterval } } func NewWatcher(cache memory.Cache, opts ...WatcherOption) (provider.Watcher, error) { w := &watcher{ WatchingServices: make(map[string]bool), watchers: make(map[string]*watch.Plan), RegistryType: provider.Consul, Status: provider.UnHealthy, cache: cache, mutex: &sync.Mutex{}, stop: make(chan struct{}), } // Set default w.ConsulRefreshInterval = int64(DefaultRefreshInterval) // Set option for _, opt := range opts { opt(w) } // Init consul client w.serverAddress = w.Domain + ":" + strconv.Itoa(int(w.Port)) config := consulapi.DefaultConfig() config.Address = w.serverAddress config.Token = w.authOption.ConsulToken client, err := consulapi.NewClient(config) if err != nil { log.Errorf("[NewWatcher] NewWatcher consul, err:%v, consul address:%s", err, w.serverAddress) return nil, err } w.consulClient = client w.consulCatalog = client.Catalog() return w, nil } func (w *watcher) fetchAllServices() error { log.Infof("consul fetchAllServices") w.mutex.Lock() defer w.mutex.Unlock() if w.isStop { return nil } fetchedServices := make(map[string]bool) q := &consulapi.QueryOptions{} q.Datacenter = w.ConsulDatacenter q.Token = w.authOption.ConsulToken services, _, err := w.consulCatalog.Services(q) if err != nil { log.Errorf("consul fetch all services error:%v", err) return err } for serviceName, tags := range services { if w.filterTags(w.ConsulServiceTag, tags) { fetchedServices[serviceName] = true } } log.Infof("consul fetch services num:%d", len(fetchedServices)) for serviceName := range w.WatchingServices { if _, exist := fetchedServices[serviceName]; !exist { err := w.unsubscribe(serviceName) if err == nil { delete(w.WatchingServices, serviceName) } } } for serviceName := range fetchedServices { if _, exist := w.WatchingServices[serviceName]; !exist { if !w.shouldSubscribe(serviceName) { continue } err := w.subscribe(serviceName) if err == nil { w.WatchingServices[serviceName] = true } } } return nil } func (w *watcher) filterTags(consulTag string, tags []string) bool { if len(consulTag) == 0 { return true } if len(tags) == 0 { return false } for _, tag := range tags { if strings.ToLower(tag) == consulTag { return true } } return false } func (w *watcher) Run() { ticker := time.NewTicker(time.Duration(w.ConsulRefreshInterval)) defer ticker.Stop() w.Status = provider.ProbeWatcherStatus(w.Domain, strconv.FormatUint(uint64(w.Port), 10)) w.fetchAllServices() w.Ready(true) for { select { case <-ticker.C: w.fetchAllServices() case <-w.stop: return } } } func (w *watcher) Stop() { w.mutex.Lock() defer w.mutex.Unlock() for serviceName := range w.WatchingServices { err := w.unsubscribe(serviceName) if err == nil { delete(w.WatchingServices, serviceName) } // clean the cache suffix := strings.Join([]string{serviceName, w.ConsulDatacenter, w.Type}, common.DotSeparator) host := strings.ReplaceAll(suffix, common.Underscore, common.Hyphen) w.cache.DeleteServiceWrapper(host) } w.isStop = true close(w.stop) w.Ready(false) } func (w *watcher) IsHealthy() bool { return w.Status == provider.Healthy } func (w *watcher) GetRegistryType() string { return w.RegistryType.String() } func (w *watcher) unsubscribe(serviceName string) error { log.Infof("consul unsubscribe service, serviceName:%s", serviceName) if plan, ok := w.watchers[serviceName]; ok { plan.Stop() delete(w.watchers, serviceName) } return nil } func (w *watcher) shouldSubscribe(serviceName string) bool { return true } func (w *watcher) subscribe(serviceName string) error { log.Infof("consul subscribe service, serviceName:%s", serviceName) plan, err := watch.Parse(map[string]interface{}{ "type": "service", "service": serviceName, }) if err != nil { return err } plan.Handler = w.getSubscribeCallback(serviceName) plan.Token = w.authOption.ConsulToken plan.Datacenter = w.ConsulDatacenter go plan.Run(w.serverAddress) w.watchers[serviceName] = plan return nil } func (w *watcher) getSubscribeCallback(serviceName string) func(idx uint64, data interface{}) { suffix := strings.Join([]string{serviceName, w.ConsulDatacenter, w.Type}, common.DotSeparator) host := strings.ReplaceAll(suffix, common.Underscore, common.Hyphen) return func(idx uint64, data interface{}) { log.Infof("consul subscribe callback service, host:%s, serviceName:%s", host, serviceName) switch services := data.(type) { case []*consulapi.ServiceEntry: defer w.UpdateService() serviceEntry := w.generateServiceEntry(host, services) if serviceEntry != nil { log.Infof("consul update serviceEntry %s cache", host) w.cache.UpdateServiceWrapper(host, &memory.ServiceWrapper{ ServiceEntry: serviceEntry, ServiceName: serviceName, Suffix: suffix, RegistryType: w.Type, RegistryName: w.Name, }) } else { log.Infof("consul serviceEntry %s is nil", host) //w.cache.DeleteServiceWrapper(host) } } } } func (w *watcher) generateServiceEntry(host string, services []*consulapi.ServiceEntry) *v1alpha3.ServiceEntry { portList := make([]*v1alpha3.ServicePort, 0) endpoints := make([]*v1alpha3.WorkloadEntry, 0) for _, service := range services { // service status: maintenance > critical > warning > passing if service.Checks.AggregatedStatus() != ConsulHealthPassing { continue } metaData := make(map[string]string, 0) if service.Service.Meta != nil { metaData = service.Service.Meta } protocol := common.HTTP if metaData["protocol"] != "" { protocol = common.ParseProtocol(metaData["protocol"]) } port := &v1alpha3.ServicePort{ Name: protocol.String(), Number: uint32(service.Service.Port), Protocol: protocol.String(), } if len(portList) == 0 { portList = append(portList, port) } endpoint := v1alpha3.WorkloadEntry{ Address: service.Service.Address, Ports: map[string]uint32{port.Protocol: port.Number}, Labels: metaData, } endpoints = append(endpoints, &endpoint) } if len(endpoints) == 0 { return nil } se := &v1alpha3.ServiceEntry{ Hosts: []string{host}, Ports: portList, Location: v1alpha3.ServiceEntry_MESH_INTERNAL, Resolution: v1alpha3.ServiceEntry_STATIC, Endpoints: endpoints, } return se }