grpc-xds/control-plane-go/pkg/xds/snapshot_cache.go (137 lines of code) (raw):
// Copyright 2023 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package xds
import (
"context"
"errors"
"fmt"
"net"
"strconv"
"strings"
cachev3 "github.com/envoyproxy/go-control-plane/pkg/cache/v3"
resourcev3 "github.com/envoyproxy/go-control-plane/pkg/resource/v3"
streamv3 "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3"
"github.com/go-logr/logr"
"github.com/googlecloudplatform/solutions-workshops/grpc-xds/control-plane-go/pkg/applications"
"github.com/googlecloudplatform/solutions-workshops/grpc-xds/control-plane-go/pkg/logging"
"github.com/googlecloudplatform/solutions-workshops/grpc-xds/control-plane-go/pkg/xds/eds"
"github.com/googlecloudplatform/solutions-workshops/grpc-xds/control-plane-go/pkg/xds/lds"
)
// Server listener resource names typically follow the template `grpc/server?xds.resource.listening_address=%s`.
// serverListenerNamePrefix is the part up to and including the `=` sign.
var serverListenerNamePrefix = strings.SplitAfter(lds.GRPCServerListenerResourceNameTemplate, "=")[0]
// SnapshotCache stores snapshots of xDS resources in a delegate cache.
//
// It handles server listener requests by intercepting Listener stream creation, see `CreateWatch()`.
// Server listeners addresses from these requests are kept in a map, keyed by the node hash,
// and with a set of addresses per node hash.
//
// It also handles propagating snapshots to all node hashes in the cache.
type SnapshotCache struct {
ctx context.Context
logger logr.Logger
// delegate is the actual xDS resource cache.
delegate cachev3.SnapshotCache
// hash is the function to determine the cache key (`nodeHash`) for nodes.
hash cachev3.NodeHash
// localityPriorityMapper constructs a priority map for localities, to be used in EDS ClusterLoadAssignment resources.
localityPriorityMapper eds.LocalityPriorityMapper
// appsCache stores the most recent gRPC application configuration information from k8s cluster EndpointSlices.
// The appsCache is used to populate new entries (previously unseen `nodeHash`es) in the xDS resource snapshot cache,
// so that the new subscribers don't have to wait for an EndpointSlice update before they can receive xDS resources.
appsCache *applications.ApplicationCache
// grpcServerListenerCache stores known server listener names for each snapshot cache key (`nodeHash`).
// These names are captured when new Listener streams are created, see `CreateWatch()`.
// The server listener names are added to xDS resource snapshots, to be included in LDS responded for xDS-enabled gRPC servers.
grpcServerListenerCache *GRPCServerListenerCache
// features contains flags to enable and disable xDS features, e.g., mTLS.
features *Features
// authority is the authority name of this control plane for xDS federation.
authority string
}
var _ cachev3.Cache = &SnapshotCache{}
// NewSnapshotCache creates an xDS resource cache for the provided node hash function.
//
// If `allowPartialRequests` is true, the DiscoveryServer will respond to requests for a resource
// type even if some resources in the snapshot are not named in the request.
func NewSnapshotCache(ctx context.Context, allowPartialRequests bool, hash cachev3.NodeHash, localityPriorityMapper eds.LocalityPriorityMapper, features *Features, authority string) *SnapshotCache {
return &SnapshotCache{
ctx: ctx,
logger: logging.FromContext(ctx),
delegate: cachev3.NewSnapshotCache(!allowPartialRequests, hash, logging.SnapshotCacheLogger(ctx)),
hash: hash,
localityPriorityMapper: localityPriorityMapper,
appsCache: applications.NewApplicationCache(),
grpcServerListenerCache: NewGRPCServerListenerCache(),
features: features,
authority: authority,
}
}
// CreateWatch intercepts stream creation before delegating, and if it is a request for Listener
// (LDS) resources stream, does the following:
//
// - Extracts addresses and ports of any server listeners in the request and adds them to the
// set of known server listener socket addresses for the node hash.
// - If there is no existing snapshot, or if the request contained new and previously unseen
// server listener addresses the node hash, creates a new snapshot for that node hash,
// with the server listeners and associated route configuration.
//
// This solves bootstrapping of xDS resources snapshots for xDS-enabled gRPC servers and
// Envoy proxy instances that fetch configuration dynamically using ADS.
func (c *SnapshotCache) CreateWatch(request *cachev3.Request, state streamv3.StreamState, responses chan cachev3.Response) (cancel func()) {
if isListenerRequest(request) {
c.logger.Info("CreateWatch",
"typeUrl", request.TypeUrl,
"resourceNames", request.ResourceNames,
"node.cluster", request.Node.Cluster,
"node.user_agent_name", request.Node.UserAgentName,
"node.id", request.Node.Id)
nodeHash := c.hash.ID(request.GetNode())
addressesFromRequest, err := findServerListenerAddresses(request.ResourceNames)
if err != nil {
c.logger.Error(err, "Problem encountered when looking for server listener addresses in new Listener stream request", "nodeHash", nodeHash)
return func() {}
}
changes := c.grpcServerListenerCache.Add(nodeHash, addressesFromRequest)
existingSnapshot, err := c.delegate.GetSnapshot(nodeHash)
if err != nil || existingSnapshot == nil || changes {
apps := c.appsCache.GetAll()
if err := c.createNewSnapshot(nodeHash, apps); err != nil {
c.logger.Error(err, "Could not set new xDS resource snapshot", "nodeHash", nodeHash, "apps", apps)
return func() {}
}
}
}
return c.delegate.CreateWatch(request, state, responses)
}
// UpdateResources creates a new snapshot for each node hash in the cache,
// based on the provided gRPC application configuration,
// with the addition of server listeners and their associated route configurations.
func (c *SnapshotCache) UpdateResources(_ context.Context, logger logr.Logger, kubecontextName string, namespace string, updatedApps []applications.Application) error {
var errs []error
changed := c.appsCache.Put(kubecontextName, namespace, updatedApps)
if !changed {
logger.V(2).Info("No application updates, so not generating new xDS resource snapshots")
return nil
}
apps := c.appsCache.GetAll()
logger.V(2).Info("Application updates, generating new xDS resource snapshots", "apps", apps)
for _, nodeHash := range c.delegate.GetStatusKeys() {
if err := c.createNewSnapshot(nodeHash, apps); err != nil {
errs = append(errs, err)
}
}
if len(errs) > 0 {
return errors.Join(errs...)
}
return nil
}
// createNewSnapshot sets a new snapshot for the provided `nodeHash` and gRPC application configuration.
func (c *SnapshotCache) createNewSnapshot(nodeHash string, apps []applications.Application) error {
c.logger.Info("Creating a new snapshot", "nodeHash", nodeHash, "apps", apps)
snapshotBuilder, err := NewSnapshotBuilder(nodeHash, c.localityPriorityMapper, c.features, c.authority).AddGRPCApplications(apps)
if err != nil {
return fmt.Errorf("could not create xDS resource snapshot builder for nodeHash=%s: %w", nodeHash, err)
}
snapshot, err := snapshotBuilder.
AddGRPCServerListenerAddresses(c.grpcServerListenerCache.Get(nodeHash)).
Build()
if err != nil {
return fmt.Errorf("could not create new xDS resource snapshot for nodeHash=%s: %w", nodeHash, err)
}
if err := c.delegate.SetSnapshot(c.ctx, nodeHash, snapshot); err != nil {
return fmt.Errorf("could not set new xDS resource snapshot for nodeHash=%s: %w", nodeHash, err)
}
return nil
}
// isListenerRequest determines if the request is a request for Listener (LDS) resources.
func isListenerRequest(request *cachev3.Request) bool {
return request != nil &&
(len(request.ResourceNames) > 0 || request.Node.UserAgentName == "envoy") &&
request.GetTypeUrl() == resourcev3.ListenerType
}
// findServerListenerAddresses looks for server Listener names in the provided
// slice and extracts the address and port for each server Listener found.
// TODO: Handle xDS federation server Listener names using `xdstp://` names,
// e.g., "xdstp://xds-authority.example.com/envoy.config.listener.v3.Listener/grpc/server/%s"
func findServerListenerAddresses(names []string) ([]EndpointAddress, error) {
var addresses []EndpointAddress
for _, name := range names {
if strings.HasPrefix(name, serverListenerNamePrefix) && len(name) > len(serverListenerNamePrefix) {
hostPort := strings.SplitAfter(name, serverListenerNamePrefix)[1]
host, portStr, err := net.SplitHostPort(hostPort)
if err != nil {
return nil, fmt.Errorf("could not extract host and port from server Listener name=%s: %w", name, err)
}
port, err := strconv.ParseUint(portStr, 10, 32)
if err != nil {
return nil, fmt.Errorf("could not extract port from server Listener name: %w", err)
}
addresses = append(addresses, EndpointAddress{
Host: host,
Port: uint32(port),
})
}
}
return addresses, nil
}
// CreateDeltaWatch just delegates, since gRPC does not support delta/incremental xDS currently.
// TODO: Handle request for gRPC server Listeners once gRPC implementation support delta/incremental xDS.
func (c *SnapshotCache) CreateDeltaWatch(request *cachev3.DeltaRequest, state streamv3.StreamState, responses chan cachev3.DeltaResponse) (cancel func()) {
return c.delegate.CreateDeltaWatch(request, state, responses)
}
func (c *SnapshotCache) Fetch(ctx context.Context, request *cachev3.Request) (cachev3.Response, error) {
return c.delegate.Fetch(ctx, request)
}