registry/memory/cache.go (315 lines of code) (raw):

// Copyright (c) 2022 Alibaba Group Holding Ltd. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package memory import ( "encoding/json" "sort" "strconv" "sync" "time" higressconfig "github.com/alibaba/higress/pkg/config" "github.com/alibaba/higress/registry" "google.golang.org/protobuf/encoding/protojson" "google.golang.org/protobuf/types/known/structpb" "google.golang.org/protobuf/types/known/wrapperspb" extensions "istio.io/api/extensions/v1alpha1" "istio.io/api/networking/v1alpha3" "istio.io/istio/pkg/config" "istio.io/istio/pkg/config/schema/gvk" "istio.io/pkg/log" "github.com/alibaba/higress/pkg/common" ingress "github.com/alibaba/higress/pkg/ingress/kube/common" ) type Cache interface { UpdateServiceWrapper(service string, data *ServiceWrapper) DeleteServiceWrapper(service string) UpdateConfigCache(kind config.GroupVersionKind, key string, config *config.Config, forceDelete bool) GetAllConfigs(kind config.GroupVersionKind) map[string]*config.Config PurgeStaleService() UpdateServiceEntryEndpointWrapper(service, ip, regionId, zoneId, protocol string, labels map[string]string) GetServiceByEndpoints(requestVersions, endpoints map[string]bool, versionKey string, protocol common.Protocol) map[string][]string GetAllServiceEntry() []*v1alpha3.ServiceEntry GetAllServiceWrapper() []*ServiceWrapper GetAllDestinationRuleWrapper() []*ingress.WrapperDestinationRule GetIncrementalServiceWrapper() (updatedList []*ServiceWrapper, deletedList []*ServiceWrapper) RemoveEndpointByIp(ip string) } func NewCache() Cache { return &store{ mux: &sync.RWMutex{}, sew: make(map[string]*ServiceWrapper), configs: make(map[string]map[string]*config.Config), toBeUpdated: make([]*ServiceWrapper, 0), toBeDeleted: make([]*ServiceWrapper, 0), ip2services: make(map[string]map[string]bool), deferedDelete: make(map[string]struct{}), } } type store struct { mux *sync.RWMutex sew map[string]*ServiceWrapper configs map[string]map[string]*config.Config toBeUpdated []*ServiceWrapper toBeDeleted []*ServiceWrapper ip2services map[string]map[string]bool deferedDelete map[string]struct{} } func (s *store) GetAllConfigs(kind config.GroupVersionKind) map[string]*config.Config { s.mux.Lock() defer s.mux.Unlock() cfgs, exist := s.configs[kind.String()] if !exist { return map[string]*config.Config{} } if kind == gvk.WasmPlugin { pluginConfig := &registry.WasmPluginConfig{} var ns string for _, cfg := range cfgs { ns = cfg.Namespace rule := cfg.Spec.(*registry.McpServerRule) pluginConfig.Rules = append(pluginConfig.Rules, rule) } rulesBytes, err := json.Marshal(pluginConfig) if err != nil { log.Errorf("marshal mcp wasm plugin config error %v", err) return map[string]*config.Config{} } pbs := &structpb.Struct{} if err = protojson.Unmarshal(rulesBytes, pbs); err != nil { log.Errorf("unmarshal mcp wasm plugin config error %v", err) return map[string]*config.Config{} } wasmPlugin := &extensions.WasmPlugin{ ImagePullPolicy: extensions.PullPolicy_Always, Phase: extensions.PluginPhase_UNSPECIFIED_PHASE, Priority: &wrapperspb.Int32Value{Value: 30}, PluginConfig: pbs, Url: higressconfig.McpServerWasmImageUrl, } return map[string]*config.Config{"wasm": &config.Config{ Meta: config.Meta{ GroupVersionKind: gvk.WasmPlugin, Name: "istio-autogenerated-mcp-wasmplugin", Namespace: ns, }, Spec: wasmPlugin, }} } return cfgs } func (s *store) UpdateConfigCache(kind config.GroupVersionKind, key string, cfg *config.Config, forceDelete bool) { if cfg == nil && !forceDelete { return } s.mux.Lock() if forceDelete { for _, allConfigs := range s.configs { delete(allConfigs, key) } log.Infof("Delete config %s in cache", key) } else { if _, exist := s.configs[kind.String()]; !exist { s.configs[kind.String()] = make(map[string]*config.Config) } if _, exist := s.configs[kind.String()][key]; exist { log.Infof("Update kind %s config %s", kind.String(), key) } else { log.Infof("Add kind %s config %s", kind.String(), key) } s.configs[kind.String()][key] = cfg } s.mux.Unlock() } func (s *store) UpdateServiceEntryEndpointWrapper(service, ip, regionId, zoneId, protocol string, labels map[string]string) { s.mux.Lock() defer s.mux.Unlock() if se, exist := s.sew[service]; exist { idx := -1 for i, ep := range se.ServiceEntry.Endpoints { if ep.Address == ip { idx = i if len(regionId) != 0 { ep.Locality = regionId if len(zoneId) != 0 { ep.Locality = regionId + "/" + zoneId } } if labels != nil { for k, v := range labels { if protocol == common.Dubbo.String() && k == "version" { ep.Labels["appversion"] = v continue } ep.Labels[k] = v } } if idx != -1 { se.ServiceEntry.Endpoints[idx] = ep } return } } } return } func (s *store) UpdateServiceWrapper(service string, data *ServiceWrapper) { s.mux.Lock() defer s.mux.Unlock() if old, exist := s.sew[service]; exist { data.SetCreateTime(old.GetCreateTime()) } else { data.SetCreateTime(time.Now()) } log.Debugf("mcp service entry update, name:%s, data:%v", service, data) s.toBeUpdated = append(s.toBeUpdated, data) s.sew[service] = data // service is updated, should not be deleted if _, ok := s.deferedDelete[service]; ok { delete(s.deferedDelete, service) log.Debugf("service in deferedDelete updated, host:%s", service) } log.Infof("ServiceEntry updated, host:%s", service) } func (s *store) DeleteServiceWrapper(service string) { s.mux.Lock() defer s.mux.Unlock() if data, exist := s.sew[service]; exist { s.toBeDeleted = append(s.toBeDeleted, data) s.deferedDelete[service] = struct{}{} } } // should only be called when reconcile is done func (s *store) PurgeStaleService() { s.mux.Lock() defer s.mux.Unlock() for service := range s.deferedDelete { delete(s.sew, service) delete(s.deferedDelete, service) log.Infof("ServiceEntry deleted, host:%s", service) } } // GetServiceByEndpoints get the list of services of which "address:port" contained by the endpoints // and the version of the service contained by the requestVersions. The result format is as below: // key: serviceName + "#@" + suffix // values: ["v1", "v2"] which has removed duplication func (s *store) GetServiceByEndpoints(requestVersions, endpoints map[string]bool, versionKey string, protocol common.Protocol) map[string][]string { s.mux.RLock() defer s.mux.RUnlock() result := make(map[string][]string) for _, serviceEntryWrapper := range s.sew { for _, workload := range serviceEntryWrapper.ServiceEntry.Endpoints { port, exist := workload.Ports[protocol.String()] if !exist { continue } endpoint := workload.Address + common.ColonSeparator + strconv.Itoa(int(port)) if _, hit := endpoints[endpoint]; hit { if version, has := workload.Labels[versionKey]; has { if _, in := requestVersions[version]; in { key := serviceEntryWrapper.ServiceName + common.SpecialSeparator + serviceEntryWrapper.Suffix result[key] = append(result[key], version) } } } } } // remove duplication for key, versions := range result { sort.Strings(versions) i := 0 for j := 1; j < len(versions); j++ { if versions[j] != versions[i] { i++ versions[i] = versions[j] } } result[key] = versions[:i+1] } return result } // GetAllServiceEntry get all ServiceEntry in the store for xds push func (s *store) GetAllServiceEntry() []*v1alpha3.ServiceEntry { s.mux.RLock() defer s.mux.RUnlock() seList := make([]*v1alpha3.ServiceEntry, 0) for _, serviceEntryWrapper := range s.sew { if len(serviceEntryWrapper.ServiceEntry.Hosts) == 0 { continue } seList = append(seList, serviceEntryWrapper.ServiceEntry.DeepCopy()) } sort.Slice(seList, func(i, j int) bool { return seList[i].Hosts[0] > seList[j].Hosts[0] }) return seList } // GetAllServiceWrapper get all ServiceWrapper in the store for xds push func (s *store) GetAllServiceWrapper() []*ServiceWrapper { s.mux.RLock() defer s.mux.RUnlock() defer s.cleanUpdateAndDeleteArray() sewList := make([]*ServiceWrapper, 0) for _, serviceEntryWrapper := range s.sew { sewList = append(sewList, serviceEntryWrapper.DeepCopy()) } return sewList } // GetAllDestinationRuleWrapper get all DestinationRuleWrapper in the store for xds push func (s *store) GetAllDestinationRuleWrapper() []*ingress.WrapperDestinationRule { s.mux.RLock() defer s.mux.RUnlock() defer s.cleanUpdateAndDeleteArray() drwList := make([]*ingress.WrapperDestinationRule, 0) for _, serviceEntryWrapper := range s.sew { if serviceEntryWrapper.DestinationRuleWrapper != nil { drwList = append(drwList, serviceEntryWrapper.DeepCopy().DestinationRuleWrapper) } } configFromMcp := s.configs[gvk.DestinationRule.String()] for _, cfg := range configFromMcp { dr := cfg.Spec.(*v1alpha3.DestinationRule) drwList = append(drwList, &ingress.WrapperDestinationRule{ DestinationRule: dr, ServiceKey: ingress.ServiceKey{ServiceFQDN: dr.Host}, }) } return drwList } // GetIncrementalServiceWrapper get incremental ServiceWrapper in the store for xds push func (s *store) GetIncrementalServiceWrapper() ([]*ServiceWrapper, []*ServiceWrapper) { s.mux.RLock() defer s.mux.RUnlock() defer s.cleanUpdateAndDeleteArray() updatedList := make([]*ServiceWrapper, 0) for _, serviceEntryWrapper := range s.toBeUpdated { updatedList = append(updatedList, serviceEntryWrapper.DeepCopy()) } deletedList := make([]*ServiceWrapper, 0) for _, serviceEntryWrapper := range s.toBeDeleted { deletedList = append(deletedList, serviceEntryWrapper.DeepCopy()) } return updatedList, deletedList } func (s *store) cleanUpdateAndDeleteArray() { s.toBeUpdated = nil s.toBeDeleted = nil } func (s *store) updateIpMap(service string, data *ServiceWrapper) { for _, ep := range data.ServiceEntry.Endpoints { if s.ip2services[ep.Address] == nil { s.ip2services[ep.Address] = make(map[string]bool) } s.ip2services[ep.Address][service] = true } } func (s *store) RemoveEndpointByIp(ip string) { s.mux.Lock() defer s.mux.Unlock() services, has := s.ip2services[ip] if !has { return } delete(s.ip2services, ip) for service := range services { if data, exist := s.sew[service]; exist { idx := -1 for i, ep := range data.ServiceEntry.Endpoints { if ep.Address == ip { idx = i break } } if idx != -1 { data.ServiceEntry.Endpoints = append(data.ServiceEntry.Endpoints[:idx], data.ServiceEntry.Endpoints[idx+1:]...) } s.toBeUpdated = append(s.toBeUpdated, data) } } }