registry/memory/cache.go (315 lines of code) (raw):
// Copyright (c) 2022 Alibaba Group Holding Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package memory
import (
"encoding/json"
"sort"
"strconv"
"sync"
"time"
higressconfig "github.com/alibaba/higress/pkg/config"
"github.com/alibaba/higress/registry"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/types/known/structpb"
"google.golang.org/protobuf/types/known/wrapperspb"
extensions "istio.io/api/extensions/v1alpha1"
"istio.io/api/networking/v1alpha3"
"istio.io/istio/pkg/config"
"istio.io/istio/pkg/config/schema/gvk"
"istio.io/pkg/log"
"github.com/alibaba/higress/pkg/common"
ingress "github.com/alibaba/higress/pkg/ingress/kube/common"
)
type Cache interface {
UpdateServiceWrapper(service string, data *ServiceWrapper)
DeleteServiceWrapper(service string)
UpdateConfigCache(kind config.GroupVersionKind, key string, config *config.Config, forceDelete bool)
GetAllConfigs(kind config.GroupVersionKind) map[string]*config.Config
PurgeStaleService()
UpdateServiceEntryEndpointWrapper(service, ip, regionId, zoneId, protocol string, labels map[string]string)
GetServiceByEndpoints(requestVersions, endpoints map[string]bool, versionKey string, protocol common.Protocol) map[string][]string
GetAllServiceEntry() []*v1alpha3.ServiceEntry
GetAllServiceWrapper() []*ServiceWrapper
GetAllDestinationRuleWrapper() []*ingress.WrapperDestinationRule
GetIncrementalServiceWrapper() (updatedList []*ServiceWrapper, deletedList []*ServiceWrapper)
RemoveEndpointByIp(ip string)
}
func NewCache() Cache {
return &store{
mux: &sync.RWMutex{},
sew: make(map[string]*ServiceWrapper),
configs: make(map[string]map[string]*config.Config),
toBeUpdated: make([]*ServiceWrapper, 0),
toBeDeleted: make([]*ServiceWrapper, 0),
ip2services: make(map[string]map[string]bool),
deferedDelete: make(map[string]struct{}),
}
}
type store struct {
mux *sync.RWMutex
sew map[string]*ServiceWrapper
configs map[string]map[string]*config.Config
toBeUpdated []*ServiceWrapper
toBeDeleted []*ServiceWrapper
ip2services map[string]map[string]bool
deferedDelete map[string]struct{}
}
func (s *store) GetAllConfigs(kind config.GroupVersionKind) map[string]*config.Config {
s.mux.Lock()
defer s.mux.Unlock()
cfgs, exist := s.configs[kind.String()]
if !exist {
return map[string]*config.Config{}
}
if kind == gvk.WasmPlugin {
pluginConfig := ®istry.WasmPluginConfig{}
var ns string
for _, cfg := range cfgs {
ns = cfg.Namespace
rule := cfg.Spec.(*registry.McpServerRule)
pluginConfig.Rules = append(pluginConfig.Rules, rule)
}
rulesBytes, err := json.Marshal(pluginConfig)
if err != nil {
log.Errorf("marshal mcp wasm plugin config error %v", err)
return map[string]*config.Config{}
}
pbs := &structpb.Struct{}
if err = protojson.Unmarshal(rulesBytes, pbs); err != nil {
log.Errorf("unmarshal mcp wasm plugin config error %v", err)
return map[string]*config.Config{}
}
wasmPlugin := &extensions.WasmPlugin{
ImagePullPolicy: extensions.PullPolicy_Always,
Phase: extensions.PluginPhase_UNSPECIFIED_PHASE,
Priority: &wrapperspb.Int32Value{Value: 30},
PluginConfig: pbs,
Url: higressconfig.McpServerWasmImageUrl,
}
return map[string]*config.Config{"wasm": &config.Config{
Meta: config.Meta{
GroupVersionKind: gvk.WasmPlugin,
Name: "istio-autogenerated-mcp-wasmplugin",
Namespace: ns,
},
Spec: wasmPlugin,
}}
}
return cfgs
}
func (s *store) UpdateConfigCache(kind config.GroupVersionKind, key string, cfg *config.Config, forceDelete bool) {
if cfg == nil && !forceDelete {
return
}
s.mux.Lock()
if forceDelete {
for _, allConfigs := range s.configs {
delete(allConfigs, key)
}
log.Infof("Delete config %s in cache", key)
} else {
if _, exist := s.configs[kind.String()]; !exist {
s.configs[kind.String()] = make(map[string]*config.Config)
}
if _, exist := s.configs[kind.String()][key]; exist {
log.Infof("Update kind %s config %s", kind.String(), key)
} else {
log.Infof("Add kind %s config %s", kind.String(), key)
}
s.configs[kind.String()][key] = cfg
}
s.mux.Unlock()
}
func (s *store) UpdateServiceEntryEndpointWrapper(service, ip, regionId, zoneId, protocol string, labels map[string]string) {
s.mux.Lock()
defer s.mux.Unlock()
if se, exist := s.sew[service]; exist {
idx := -1
for i, ep := range se.ServiceEntry.Endpoints {
if ep.Address == ip {
idx = i
if len(regionId) != 0 {
ep.Locality = regionId
if len(zoneId) != 0 {
ep.Locality = regionId + "/" + zoneId
}
}
if labels != nil {
for k, v := range labels {
if protocol == common.Dubbo.String() && k == "version" {
ep.Labels["appversion"] = v
continue
}
ep.Labels[k] = v
}
}
if idx != -1 {
se.ServiceEntry.Endpoints[idx] = ep
}
return
}
}
}
return
}
func (s *store) UpdateServiceWrapper(service string, data *ServiceWrapper) {
s.mux.Lock()
defer s.mux.Unlock()
if old, exist := s.sew[service]; exist {
data.SetCreateTime(old.GetCreateTime())
} else {
data.SetCreateTime(time.Now())
}
log.Debugf("mcp service entry update, name:%s, data:%v", service, data)
s.toBeUpdated = append(s.toBeUpdated, data)
s.sew[service] = data
// service is updated, should not be deleted
if _, ok := s.deferedDelete[service]; ok {
delete(s.deferedDelete, service)
log.Debugf("service in deferedDelete updated, host:%s", service)
}
log.Infof("ServiceEntry updated, host:%s", service)
}
func (s *store) DeleteServiceWrapper(service string) {
s.mux.Lock()
defer s.mux.Unlock()
if data, exist := s.sew[service]; exist {
s.toBeDeleted = append(s.toBeDeleted, data)
s.deferedDelete[service] = struct{}{}
}
}
// should only be called when reconcile is done
func (s *store) PurgeStaleService() {
s.mux.Lock()
defer s.mux.Unlock()
for service := range s.deferedDelete {
delete(s.sew, service)
delete(s.deferedDelete, service)
log.Infof("ServiceEntry deleted, host:%s", service)
}
}
// GetServiceByEndpoints get the list of services of which "address:port" contained by the endpoints
// and the version of the service contained by the requestVersions. The result format is as below:
// key: serviceName + "#@" + suffix
// values: ["v1", "v2"] which has removed duplication
func (s *store) GetServiceByEndpoints(requestVersions, endpoints map[string]bool, versionKey string, protocol common.Protocol) map[string][]string {
s.mux.RLock()
defer s.mux.RUnlock()
result := make(map[string][]string)
for _, serviceEntryWrapper := range s.sew {
for _, workload := range serviceEntryWrapper.ServiceEntry.Endpoints {
port, exist := workload.Ports[protocol.String()]
if !exist {
continue
}
endpoint := workload.Address + common.ColonSeparator + strconv.Itoa(int(port))
if _, hit := endpoints[endpoint]; hit {
if version, has := workload.Labels[versionKey]; has {
if _, in := requestVersions[version]; in {
key := serviceEntryWrapper.ServiceName + common.SpecialSeparator + serviceEntryWrapper.Suffix
result[key] = append(result[key], version)
}
}
}
}
}
// remove duplication
for key, versions := range result {
sort.Strings(versions)
i := 0
for j := 1; j < len(versions); j++ {
if versions[j] != versions[i] {
i++
versions[i] = versions[j]
}
}
result[key] = versions[:i+1]
}
return result
}
// GetAllServiceEntry get all ServiceEntry in the store for xds push
func (s *store) GetAllServiceEntry() []*v1alpha3.ServiceEntry {
s.mux.RLock()
defer s.mux.RUnlock()
seList := make([]*v1alpha3.ServiceEntry, 0)
for _, serviceEntryWrapper := range s.sew {
if len(serviceEntryWrapper.ServiceEntry.Hosts) == 0 {
continue
}
seList = append(seList, serviceEntryWrapper.ServiceEntry.DeepCopy())
}
sort.Slice(seList, func(i, j int) bool {
return seList[i].Hosts[0] > seList[j].Hosts[0]
})
return seList
}
// GetAllServiceWrapper get all ServiceWrapper in the store for xds push
func (s *store) GetAllServiceWrapper() []*ServiceWrapper {
s.mux.RLock()
defer s.mux.RUnlock()
defer s.cleanUpdateAndDeleteArray()
sewList := make([]*ServiceWrapper, 0)
for _, serviceEntryWrapper := range s.sew {
sewList = append(sewList, serviceEntryWrapper.DeepCopy())
}
return sewList
}
// GetAllDestinationRuleWrapper get all DestinationRuleWrapper in the store for xds push
func (s *store) GetAllDestinationRuleWrapper() []*ingress.WrapperDestinationRule {
s.mux.RLock()
defer s.mux.RUnlock()
defer s.cleanUpdateAndDeleteArray()
drwList := make([]*ingress.WrapperDestinationRule, 0)
for _, serviceEntryWrapper := range s.sew {
if serviceEntryWrapper.DestinationRuleWrapper != nil {
drwList = append(drwList, serviceEntryWrapper.DeepCopy().DestinationRuleWrapper)
}
}
configFromMcp := s.configs[gvk.DestinationRule.String()]
for _, cfg := range configFromMcp {
dr := cfg.Spec.(*v1alpha3.DestinationRule)
drwList = append(drwList, &ingress.WrapperDestinationRule{
DestinationRule: dr,
ServiceKey: ingress.ServiceKey{ServiceFQDN: dr.Host},
})
}
return drwList
}
// GetIncrementalServiceWrapper get incremental ServiceWrapper in the store for xds push
func (s *store) GetIncrementalServiceWrapper() ([]*ServiceWrapper, []*ServiceWrapper) {
s.mux.RLock()
defer s.mux.RUnlock()
defer s.cleanUpdateAndDeleteArray()
updatedList := make([]*ServiceWrapper, 0)
for _, serviceEntryWrapper := range s.toBeUpdated {
updatedList = append(updatedList, serviceEntryWrapper.DeepCopy())
}
deletedList := make([]*ServiceWrapper, 0)
for _, serviceEntryWrapper := range s.toBeDeleted {
deletedList = append(deletedList, serviceEntryWrapper.DeepCopy())
}
return updatedList, deletedList
}
func (s *store) cleanUpdateAndDeleteArray() {
s.toBeUpdated = nil
s.toBeDeleted = nil
}
func (s *store) updateIpMap(service string, data *ServiceWrapper) {
for _, ep := range data.ServiceEntry.Endpoints {
if s.ip2services[ep.Address] == nil {
s.ip2services[ep.Address] = make(map[string]bool)
}
s.ip2services[ep.Address][service] = true
}
}
func (s *store) RemoveEndpointByIp(ip string) {
s.mux.Lock()
defer s.mux.Unlock()
services, has := s.ip2services[ip]
if !has {
return
}
delete(s.ip2services, ip)
for service := range services {
if data, exist := s.sew[service]; exist {
idx := -1
for i, ep := range data.ServiceEntry.Endpoints {
if ep.Address == ip {
idx = i
break
}
}
if idx != -1 {
data.ServiceEntry.Endpoints = append(data.ServiceEntry.Endpoints[:idx], data.ServiceEntry.Endpoints[idx+1:]...)
}
s.toBeUpdated = append(s.toBeUpdated, data)
}
}
}