pilot/pkg/model/xds_cache.go (289 lines of code) (raw):

// Copyright Istio Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package model import ( "fmt" "sync" "time" ) import ( discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" "github.com/google/go-cmp/cmp" "github.com/hashicorp/golang-lru/simplelru" "google.golang.org/protobuf/testing/protocmp" "istio.io/pkg/monitoring" ) import ( "github.com/apache/dubbo-go-pixiu/pilot/pkg/features" "github.com/apache/dubbo-go-pixiu/pkg/config" "github.com/apache/dubbo-go-pixiu/pkg/util/sets" ) func init() { monitoring.MustRegister(xdsCacheReads) monitoring.MustRegister(xdsCacheEvictions) monitoring.MustRegister(xdsCacheSize) } var ( xdsCacheReads = monitoring.NewSum( "xds_cache_reads", "Total number of xds cache xdsCacheReads.", monitoring.WithLabels(typeTag), ) xdsCacheEvictions = monitoring.NewSum( "xds_cache_evictions", "Total number of xds cache evictions.", ) xdsCacheSize = monitoring.NewGauge( "xds_cache_size", "Current size of xds cache", ) xdsCacheHits = xdsCacheReads.With(typeTag.Value("hit")) xdsCacheMisses = xdsCacheReads.With(typeTag.Value("miss")) ) func hit() { if features.EnableXDSCacheMetrics { xdsCacheHits.Increment() } } func miss() { if features.EnableXDSCacheMetrics { xdsCacheMisses.Increment() } } func size(cs int) { if features.EnableXDSCacheMetrics { xdsCacheSize.Record(float64(cs)) } } func indexConfig(configIndex map[ConfigKey]sets.Set, k string, dependentConfigs []ConfigKey) { for _, cfg := range dependentConfigs { if configIndex[cfg] == nil { configIndex[cfg] = sets.New() } configIndex[cfg].Insert(k) } } func clearIndexConfig(configIndex map[ConfigKey]sets.Set, k string, dependentConfigs []ConfigKey) { for _, cfg := range dependentConfigs { index := configIndex[cfg] if index != nil { index.Delete(k) if index.IsEmpty() { delete(configIndex, cfg) } } } } func indexType(typeIndex map[config.GroupVersionKind]sets.Set, k string, dependentTypes []config.GroupVersionKind) { for _, t := range dependentTypes { if typeIndex[t] == nil { typeIndex[t] = sets.New() } typeIndex[t].Insert(k) } } func clearIndexType(typeIndex map[config.GroupVersionKind]sets.Set, k string, dependentTypes []config.GroupVersionKind) { for _, t := range dependentTypes { index := typeIndex[t] if index != nil { index.Delete(k) if index.IsEmpty() { delete(typeIndex, t) } } } } // XdsCacheEntry interface defines functions that should be implemented by // resources that can be cached. type XdsCacheEntry interface { // Key is the key to be used in cache. Key() string // DependentTypes are config types that this cache key is dependant on. // Whenever any configs of this type changes, we should invalidate this cache entry. // Note: DependentConfigs should be preferred wherever possible. DependentTypes() []config.GroupVersionKind // DependentConfigs is config items that this cache key is dependent on. // Whenever these configs change, we should invalidate this cache entry. DependentConfigs() []ConfigKey // Cacheable indicates whether this entry is valid for cache. For example // for EDS to be cacheable, the Endpoint should have corresponding service. Cacheable() bool } type CacheToken uint64 // XdsCache interface defines a store for caching XDS responses. // All operations are thread safe. type XdsCache interface { // Add adds the given XdsCacheEntry with the value for the given pushContext to the cache. // If the cache has been updated to a newer push context, the write will be dropped silently. // This ensures stale data does not overwrite fresh data when dealing with concurrent // writers. Add(entry XdsCacheEntry, pushRequest *PushRequest, value *discovery.Resource) // Get retrieves the cached value if it exists. The boolean indicates // whether the entry exists in the cache. Get(entry XdsCacheEntry) (*discovery.Resource, bool) // Clear removes the cache entries that are dependent on the configs passed. Clear(map[ConfigKey]struct{}) // ClearAll clears the entire cache. ClearAll() // Keys returns all currently configured keys. This is for testing/debug only Keys() []string // Snapshot returns a snapshot of all keys and values. This is for testing/debug only Snapshot() map[string]*discovery.Resource } // NewXdsCache returns an instance of a cache. func NewXdsCache() XdsCache { cache := &lruCache{ enableAssertions: features.EnableUnsafeAssertions, configIndex: map[ConfigKey]sets.Set{}, typesIndex: map[config.GroupVersionKind]sets.Set{}, } cache.store = newLru(cache.evict) return cache } // NewLenientXdsCache returns an instance of a cache that does not validate token based get/set and enable assertions. func NewLenientXdsCache() XdsCache { cache := &lruCache{ enableAssertions: false, configIndex: map[ConfigKey]sets.Set{}, typesIndex: map[config.GroupVersionKind]sets.Set{}, } cache.store = newLru(cache.evict) return cache } type lruCache struct { enableAssertions bool store simplelru.LRUCache // token stores the latest token of the store, used to prevent stale data overwrite. // It is refreshed when Clear or ClearAll are called token CacheToken mu sync.RWMutex configIndex map[ConfigKey]sets.Set typesIndex map[config.GroupVersionKind]sets.Set } var _ XdsCache = &lruCache{} func newLru(evictCallback simplelru.EvictCallback) simplelru.LRUCache { sz := features.XDSCacheMaxSize if sz <= 0 { sz = 20000 } l, err := simplelru.NewLRU(sz, evictCallback) if err != nil { panic(fmt.Errorf("invalid lru configuration: %v", err)) } return l } func (l *lruCache) evict(k interface{}, v interface{}) { if features.EnableXDSCacheMetrics { xdsCacheEvictions.Increment() } key := k.(string) value := v.(cacheValue) // we don't need to acquire locks, since this function is called when we write to the store clearIndexConfig(l.configIndex, key, value.dependentConfigs) clearIndexType(l.typesIndex, key, value.dependentTypes) } // assertUnchanged checks that a cache entry is not changed. This helps catch bad cache invalidation // We should never have a case where we overwrite an existing item with a new change. Instead, when // config sources change, Clear/ClearAll should be called. At this point, we may get multiple writes // because multiple writers may get cache misses concurrently, but they ought to generate identical // configuration. This also checks that our XDS config generation is deterministic, which is a very // important property. func (l *lruCache) assertUnchanged(key string, existing *discovery.Resource, replacement *discovery.Resource) { if l.enableAssertions { if existing == nil { // This is a new addition, not an update return } // Record time so that we can correlate when the error actually happened, since the async reporting // may be delayed t0 := time.Now() // This operation is really slow, which makes tests fail for unrelated reasons, so we process it async. go func() { if !cmp.Equal(existing, replacement, protocmp.Transform()) { warning := fmt.Errorf("assertion failed at %v, cache entry changed but not cleared for key %v: %v\n%v\n%v", t0, key, cmp.Diff(existing, replacement, protocmp.Transform()), existing, replacement) panic(warning) } }() } } func (l *lruCache) Add(entry XdsCacheEntry, pushReq *PushRequest, value *discovery.Resource) { if !entry.Cacheable() || pushReq == nil || pushReq.Start.Equal(time.Time{}) { return } // It will not overflow until year 2262 token := CacheToken(pushReq.Start.UnixNano()) k := entry.Key() l.mu.Lock() defer l.mu.Unlock() cur, f := l.store.Get(k) if f { // This is the stale resource if token < cur.(cacheValue).token || token < l.token { // entry may be stale, we need to drop it. This can happen when the cache is invalidated // after we call Get. return } if l.enableAssertions { l.assertUnchanged(k, cur.(cacheValue).value, value) } } if token < l.token { return } // we have to make sure we evict old entries with the same key // to prevent leaking in the index maps if old, ok := l.store.Get(k); ok { l.evict(k, old) } dependentConfigs := entry.DependentConfigs() dependentTypes := entry.DependentTypes() toWrite := cacheValue{value: value, token: token, dependentConfigs: dependentConfigs, dependentTypes: dependentTypes} l.store.Add(k, toWrite) l.token = token indexConfig(l.configIndex, k, dependentConfigs) indexType(l.typesIndex, k, dependentTypes) size(l.store.Len()) } type cacheValue struct { value *discovery.Resource token CacheToken dependentConfigs []ConfigKey dependentTypes []config.GroupVersionKind } func (l *lruCache) Get(entry XdsCacheEntry) (*discovery.Resource, bool) { if !entry.Cacheable() { return nil, false } k := entry.Key() l.mu.Lock() defer l.mu.Unlock() val, ok := l.store.Get(k) if !ok { miss() return nil, false } cv := val.(cacheValue) if cv.value == nil { miss() return nil, false } hit() return cv.value, true } func (l *lruCache) Clear(configs map[ConfigKey]struct{}) { l.mu.Lock() defer l.mu.Unlock() l.token = CacheToken(time.Now().UnixNano()) for ckey := range configs { referenced := l.configIndex[ckey] delete(l.configIndex, ckey) for key := range referenced { l.store.Remove(key) } tReferenced := l.typesIndex[ckey.Kind] delete(l.typesIndex, ckey.Kind) for key := range tReferenced { l.store.Remove(key) } } size(l.store.Len()) } func (l *lruCache) ClearAll() { l.mu.Lock() defer l.mu.Unlock() l.token = CacheToken(time.Now().UnixNano()) // Purge with an evict function would turn up to be pretty slow since // it runs the function for every key in the store, might be better to just // create a new store. l.store = newLru(l.evict) l.configIndex = map[ConfigKey]sets.Set{} l.typesIndex = map[config.GroupVersionKind]sets.Set{} size(l.store.Len()) } func (l *lruCache) Keys() []string { l.mu.RLock() defer l.mu.RUnlock() iKeys := l.store.Keys() keys := make([]string, 0, len(iKeys)) for _, ik := range iKeys { keys = append(keys, ik.(string)) } return keys } func (l *lruCache) Snapshot() map[string]*discovery.Resource { l.mu.RLock() defer l.mu.RUnlock() iKeys := l.store.Keys() res := make(map[string]*discovery.Resource, len(iKeys)) for _, ik := range iKeys { v, ok := l.store.Get(ik) if !ok { continue } res[ik.(string)] = v.(cacheValue).value } return res } // DisabledCache is a cache that is always empty type DisabledCache struct{} var _ XdsCache = &DisabledCache{} func (d DisabledCache) Add(key XdsCacheEntry, pushReq *PushRequest, value *discovery.Resource) {} func (d DisabledCache) Get(XdsCacheEntry) (*discovery.Resource, bool) { return nil, false } func (d DisabledCache) Clear(configsUpdated map[ConfigKey]struct{}) {} func (d DisabledCache) ClearAll() {} func (d DisabledCache) Keys() []string { return nil } func (d DisabledCache) Snapshot() map[string]*discovery.Resource { return nil }