pkg/util/xds/v3/cache.go (296 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ // Copyright 2018 Envoyproxy Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package v3 import ( "context" "fmt" "sync" "sync/atomic" "time" ) import ( envoy_config_core_v3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" "github.com/envoyproxy/go-control-plane/pkg/cache/types" envoy_cache "github.com/envoyproxy/go-control-plane/pkg/cache/v3" "github.com/envoyproxy/go-control-plane/pkg/log" "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3" ) type Snapshot interface { // GetSupportedTypes returns a list of xDS types supported by this snapshot. GetSupportedTypes() []string // Consistent check verifies that the dependent resources are exactly listed in the // snapshot: // - all EDS resources are listed by name in CDS resources // - all RDS resources are listed by name in LDS resources // // Note that clusters and listeners are requested without name references, so // Envoy will accept the snapshot list of clusters as-is even if it does not match // all references found in xDS. Consistent() error // GetResources selects snapshot resources by type. GetResources(typ string) map[string]types.Resource // GetVersion returns the version for a resource type. GetVersion(typ string) string // WithVersion creates a new snapshot with a different version for a given resource type. WithVersion(typ string, version string) Snapshot } // SnapshotCache is a snapshot-based envoy_cache that maintains a single versioned // snapshot of responses per node. SnapshotCache consistently replies with the // latest snapshot. For the protocol to work correctly in ADS mode, EDS/RDS // requests are responded only when all resources in the snapshot xDS response // are named as part of the request. It is expected that the CDS response names // all EDS clusters, and the LDS response names all RDS routes in a snapshot, // to ensure that Envoy makes the request for all EDS clusters or RDS routes // eventually. // // SnapshotCache can operate as a REST or regular xDS backend. The snapshot // can be partial, e.g. only include RDS or EDS resources. type SnapshotCache interface { envoy_cache.Cache // SetSnapshot sets a response snapshot for a node. For ADS, the snapshots // should have distinct versions and be internally consistent (e.g. all // referenced resources must be included in the snapshot). // // This method will cause the server to respond to all open watches, for which // the version differs from the snapshot version. SetSnapshot(node string, snapshot Snapshot) error // GetSnapshot gets the snapshot for a node. GetSnapshot(node string) (Snapshot, error) // HasSnapshot checks whether there is a snapshot present for a node. HasSnapshot(node string) bool // ClearSnapshot removes all status and snapshot information associated with a node. Return the removed snapshot or nil ClearSnapshot(node string) Snapshot // GetStatusInfo retrieves status information for a node ID. GetStatusInfo(string) StatusInfo // GetStatusKeys retrieves node IDs for all statuses. GetStatusKeys() []string } // Generates a snapshot of xDS resources for a given node. type SnapshotGenerator interface { GenerateSnapshot(context.Context, *envoy_config_core_v3.Node) (Snapshot, error) } type snapshotCache struct { // watchCount is an atomic counter incremented for each watch. This needs to // be the first field in the struct to guarantee that it is 64-bit aligned, // which is a requirement for atomic operations on 64-bit operands to work on // 32-bit machines. watchCount int64 log log.Logger // ads flag to hold responses until all resources are named ads bool // snapshots are cached resources indexed by node IDs snapshots map[string]Snapshot // status information for all nodes indexed by node IDs status map[string]*statusInfo // hash is the hashing function for Envoy nodes hash NodeHash mu sync.RWMutex } // NewSnapshotCache initializes a simple envoy_cache. // // ADS flag forces a delay in responding to streaming requests until all // resources are explicitly named in the request. This avoids the problem of a // partial request over a single stream for a subset of resources which would // require generating a fresh version for acknowledgement. ADS flag requires // snapshot consistency. For non-ADS case (and fetch), multiple partial // requests are sent across multiple streams and re-using the snapshot version // is OK. // // Logger is optional. func NewSnapshotCache(ads bool, hash NodeHash, logger log.Logger) SnapshotCache { return &snapshotCache{ log: logger, ads: ads, snapshots: make(map[string]Snapshot), status: make(map[string]*statusInfo), hash: hash, } } // SetSnapshotCache updates a snapshot for a node. func (cache *snapshotCache) SetSnapshot(node string, snapshot Snapshot) error { cache.mu.Lock() defer cache.mu.Unlock() // update the existing entry cache.snapshots[node] = snapshot // trigger existing watches for which version changed if info, ok := cache.status[node]; ok { info.mu.Lock() for id, watch := range info.watches { version := snapshot.GetVersion(watch.Request.TypeUrl) if version != watch.Request.VersionInfo { if cache.log != nil { cache.log.Debugf("respond open watch %d%v with new version %q", id, watch.Request.ResourceNames, version) } cache.respond(watch.Request, watch.Response, snapshot.GetResources(watch.Request.TypeUrl), version) // discard the watch delete(info.watches, id) } } info.mu.Unlock() } return nil } // GetSnapshots gets the snapshot for a node, and returns an error if not found. func (cache *snapshotCache) GetSnapshot(node string) (Snapshot, error) { cache.mu.RLock() defer cache.mu.RUnlock() snap, ok := cache.snapshots[node] if !ok { return nil, fmt.Errorf("no snapshot found for node %s", node) } return snap, nil } func (cache *snapshotCache) HasSnapshot(node string) bool { cache.mu.RLock() defer cache.mu.RUnlock() _, ok := cache.snapshots[node] return ok } // ClearSnapshot clears snapshot and info for a node. func (cache *snapshotCache) ClearSnapshot(node string) Snapshot { cache.mu.Lock() defer cache.mu.Unlock() snapshot := cache.snapshots[node] delete(cache.snapshots, node) delete(cache.status, node) return snapshot } // nameSet creates a map from a string slice to value true. func nameSet(names []string) map[string]bool { set := make(map[string]bool) for _, name := range names { set[name] = true } return set } // superset checks that all resources are listed in the names set. func superset(names map[string]bool, resources map[string]types.Resource) error { for resourceName := range resources { if _, exists := names[resourceName]; !exists { return fmt.Errorf("%q not listed", resourceName) } } return nil } func (cache *snapshotCache) CreateDeltaWatch(*envoy_cache.DeltaRequest, stream.StreamState, chan envoy_cache.DeltaResponse) func() { return nil } // CreateWatch returns a watch for an xDS request. func (cache *snapshotCache) CreateWatch(request *envoy_cache.Request, _ stream.StreamState, responseChan chan envoy_cache.Response) func() { nodeID := cache.hash.ID(request.Node) cache.mu.Lock() defer cache.mu.Unlock() info, ok := cache.status[nodeID] if !ok { info = newStatusInfo(request.Node) cache.status[nodeID] = info } // update last watch request time info.mu.Lock() info.lastWatchRequestTime = time.Now() info.mu.Unlock() snapshot, exists := cache.snapshots[nodeID] version := "" if exists { version = snapshot.GetVersion(request.TypeUrl) } // if the requested version is up-to-date or missing a response, leave an open watch if !exists || request.VersionInfo == version { watchID := cache.nextWatchID() if cache.log != nil { cache.log.Debugf("open watch %d for %s%v from nodeID %q, version %q", watchID, request.TypeUrl, request.ResourceNames, nodeID, request.VersionInfo) } info.mu.Lock() info.watches[watchID] = ResponseWatch{Request: request, Response: responseChan} info.mu.Unlock() return cache.cancelWatch(nodeID, watchID) } // otherwise, the watch may be responded immediately cache.respond(request, responseChan, snapshot.GetResources(request.TypeUrl), version) return nil } func (cache *snapshotCache) nextWatchID() int64 { return atomic.AddInt64(&cache.watchCount, 1) } // cancellation function for cleaning stale watches func (cache *snapshotCache) cancelWatch(nodeID string, watchID int64) func() { return func() { // uses the envoy_cache mutex cache.mu.Lock() defer cache.mu.Unlock() if info, ok := cache.status[nodeID]; ok { info.mu.Lock() delete(info.watches, watchID) info.mu.Unlock() } } } // Respond to a watch with the snapshot value. The value channel should have capacity not to block. // TODO(kuat) do not respond always, see issue https://github.com/envoyproxy/go-control-plane/issues/46 func (cache *snapshotCache) respond(request *envoy_cache.Request, value chan envoy_cache.Response, resources map[string]types.Resource, version string) { // for ADS, the request names must match the snapshot names // if they do not, then the watch is never responded, and it is expected that envoy makes another request if len(request.ResourceNames) != 0 && cache.ads { if err := superset(nameSet(request.ResourceNames), resources); err != nil { if cache.log != nil { cache.log.Debugf("ADS mode: not responding to request: %v", err) } return } } if cache.log != nil { cache.log.Debugf("respond %s%v version %q with version %q", request.TypeUrl, request.ResourceNames, request.VersionInfo, version) } value <- createResponse(request, resources, version) } func createResponse(request *envoy_cache.Request, resources map[string]types.Resource, version string) envoy_cache.Response { filtered := make([]types.ResourceWithTTL, 0, len(resources)) // Reply only with the requested resources. Envoy may ask each resource // individually in a separate stream. It is ok to reply with the same version // on separate streams since requests do not share their response versions. if len(request.ResourceNames) != 0 { set := nameSet(request.ResourceNames) for name, resource := range resources { if set[name] { filtered = append(filtered, types.ResourceWithTTL{Resource: resource}) } } } else { for _, resource := range resources { filtered = append(filtered, types.ResourceWithTTL{Resource: resource}) } } return &envoy_cache.RawResponse{ Request: request, Version: version, Resources: filtered, } } // Fetch implements the envoy_cache fetch function. // Fetch is called on multiple streams, so responding to individual names with the same version works. // If there is a Deadline set on the context, the call will block until either the context is terminated // or there is a new update. func (cache *snapshotCache) Fetch(ctx context.Context, request *envoy_cache.Request) (envoy_cache.Response, error) { if _, hasDeadline := ctx.Deadline(); hasDeadline { return cache.blockingFetch(ctx, request) } nodeID := cache.hash.ID(request.Node) cache.mu.RLock() defer cache.mu.RUnlock() if snapshot, exists := cache.snapshots[nodeID]; exists { // Respond only if the request version is distinct from the current snapshot state. // It might be beneficial to hold the request since Envoy will re-attempt the refresh. version := snapshot.GetVersion(request.TypeUrl) if request.VersionInfo == version { if cache.log != nil { cache.log.Warnf("skip fetch: version up to date") } return nil, &types.SkipFetchError{} } resources := snapshot.GetResources(request.TypeUrl) out := createResponse(request, resources, version) return out, nil } return nil, fmt.Errorf("missing snapshot for %q", nodeID) } // blockingFetch will wait until either the context is terminated or new resources become available func (cache *snapshotCache) blockingFetch(ctx context.Context, request *envoy_cache.Request) (envoy_cache.Response, error) { responseChan := make(chan envoy_cache.Response, 1) cancelFunc := cache.CreateWatch(request, stream.StreamState{}, responseChan) if cancelFunc != nil { defer cancelFunc() } select { case <-ctx.Done(): // finished without an update return nil, &types.SkipFetchError{} case resp := <-responseChan: return resp, nil } } // GetStatusInfo retrieves the status info for the node. func (cache *snapshotCache) GetStatusInfo(node string) StatusInfo { cache.mu.RLock() defer cache.mu.RUnlock() info, exists := cache.status[node] if !exists { if cache.log != nil { cache.log.Warnf("node does not exist") } return nil } return info } // GetStatusKeys retrieves all node IDs in the status map. func (cache *snapshotCache) GetStatusKeys() []string { cache.mu.RLock() defer cache.mu.RUnlock() out := make([]string, 0, len(cache.status)) for id := range cache.status { out = append(out, id) } return out } // NodeHash computes string identifiers for Envoy nodes. type NodeHash interface { // ID function defines a unique string identifier for the remote Envoy node. ID(node *envoy_config_core_v3.Node) string } // IDHash uses ID field as the node hash. type IDHash struct{} // ID uses the node ID field func (IDHash) ID(node *envoy_config_core_v3.Node) string { if node == nil { return "" } return node.Id } var _ NodeHash = IDHash{} // StatusInfo tracks the server state for the remote Envoy node. // Not all fields are used by all envoy_cache implementations. type StatusInfo interface { // GetNode returns the node metadata. GetNode() *envoy_config_core_v3.Node // GetNumWatches returns the number of open watches. GetNumWatches() int // GetLastWatchRequestTime returns the timestamp of the last discovery watch request. GetLastWatchRequestTime() time.Time } type statusInfo struct { // node is the constant Envoy node metadata. node *envoy_config_core_v3.Node // watches are indexed channels for the response watches and the original requests. watches map[int64]ResponseWatch // the timestamp of the last watch request lastWatchRequestTime time.Time // mutex to protect the status fields. // should not acquire mutex of the parent envoy_cache after acquiring this mutex. mu sync.RWMutex } // ResponseWatch is a watch record keeping both the request and an open channel for the response. type ResponseWatch struct { // Request is the original request for the watch. Request *envoy_cache.Request // Response is the channel to push responses to. Response chan envoy_cache.Response } // newStatusInfo initializes a status info data structure. func newStatusInfo(node *envoy_config_core_v3.Node) *statusInfo { out := statusInfo{ node: node, watches: make(map[int64]ResponseWatch), } return &out } func (info *statusInfo) GetNode() *envoy_config_core_v3.Node { info.mu.RLock() defer info.mu.RUnlock() return info.node } func (info *statusInfo) GetNumWatches() int { info.mu.RLock() defer info.mu.RUnlock() return len(info.watches) } func (info *statusInfo) GetLastWatchRequestTime() time.Time { info.mu.RLock() defer info.mu.RUnlock() return info.lastWatchRequestTime }