grpc-xds/control-plane-go/pkg/xds/snapshot_builder.go (193 lines of code) (raw):

// Copyright 2023 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package xds import ( "fmt" "strconv" "time" "github.com/envoyproxy/go-control-plane/pkg/cache/types" cachev3 "github.com/envoyproxy/go-control-plane/pkg/cache/v3" "github.com/envoyproxy/go-control-plane/pkg/resource/v3" "github.com/googlecloudplatform/solutions-workshops/grpc-xds/control-plane-go/pkg/applications" "github.com/googlecloudplatform/solutions-workshops/grpc-xds/control-plane-go/pkg/xds/cds" "github.com/googlecloudplatform/solutions-workshops/grpc-xds/control-plane-go/pkg/xds/eds" "github.com/googlecloudplatform/solutions-workshops/grpc-xds/control-plane-go/pkg/xds/lds" "github.com/googlecloudplatform/solutions-workshops/grpc-xds/control-plane-go/pkg/xds/rds" ) // SnapshotBuilder builds xDS resource snapshots for the cache. type SnapshotBuilder struct { listeners map[string]types.Resource routeConfigurations map[string]types.Resource clusters map[string]types.Resource clusterLoadAssignments map[string]types.Resource endpointsByCluster map[string][]applications.ApplicationEndpoints grpcServerListenerAddresses map[EndpointAddress]bool nodeHash string localityPriorityMapper eds.LocalityPriorityMapper features *Features authority string } // NewSnapshotBuilder initializes the builder. func NewSnapshotBuilder(nodeHash string, localityPriorityMapper eds.LocalityPriorityMapper, features *Features, authority string) *SnapshotBuilder { return &SnapshotBuilder{ listeners: make(map[string]types.Resource), routeConfigurations: make(map[string]types.Resource), clusters: make(map[string]types.Resource), clusterLoadAssignments: make(map[string]types.Resource), endpointsByCluster: make(map[string][]applications.ApplicationEndpoints), grpcServerListenerAddresses: make(map[EndpointAddress]bool), nodeHash: nodeHash, localityPriorityMapper: localityPriorityMapper, features: features, authority: authority, } } // AddGRPCApplications adds the provided application configurations to the xDS resource snapshot. func (b *SnapshotBuilder) AddGRPCApplications(apps []applications.Application) (*SnapshotBuilder, error) { for _, app := range apps { if b.listeners[app.Name] == nil { apiListener, err := lds.CreateAPIListener(app.Name, app.Name) if err != nil { return nil, fmt.Errorf("could not create LDS API listener for gRPC application %+v: %w", app, err) } b.listeners[apiListener.Name] = apiListener if b.features.EnableFederation { xdstpListenerName := xdstpListener(b.authority, app.Name) xdstpRouteConfigurationName := xdstpRouteConfiguration(b.authority, app.Name) xdstpListener, err := lds.CreateAPIListener(xdstpListenerName, xdstpRouteConfigurationName) if err != nil { return nil, fmt.Errorf("could not create federation LDS API listener for authority=%s and gRPC application %+v: %w", b.authority, app, err) } b.listeners[xdstpListener.Name] = xdstpListener } } if b.routeConfigurations[app.Name] == nil { routeConfiguration := rds.CreateRouteConfigurationForAPIListener(app.Name, app.Name, app.PathPrefix, app.Name) b.routeConfigurations[routeConfiguration.Name] = routeConfiguration if b.features.EnableFederation { xdstpRouteConfigurationName := xdstpRouteConfiguration(b.authority, app.Name) xdstpClusterName := xdstpCluster(b.authority, app.Name) xdstpRouteConfiguration := rds.CreateRouteConfigurationForAPIListener(xdstpRouteConfigurationName, app.Name, app.PathPrefix, xdstpClusterName) b.routeConfigurations[xdstpRouteConfiguration.Name] = xdstpRouteConfiguration } } if b.clusters[app.Name] == nil { cluster, err := cds.CreateCluster( app.Name, app.Name, app.Namespace, app.ServiceAccountName, app.HealthCheckPort, app.HealthCheckProtocol, "", b.features.EnableDataPlaneTLS, b.features.RequireDataPlaneClientCerts) if err != nil { return nil, fmt.Errorf("could not create CDS Cluster for gRPC application %+v: %w", app, err) } b.clusters[cluster.Name] = cluster if b.features.EnableFederation { xdstpClusterName := xdstpCluster(b.authority, app.Name) xdstpEDSServiceName := xdstpEdsService(b.authority, app.Name) xdstpCluster, err := cds.CreateCluster( xdstpClusterName, xdstpEDSServiceName, app.Namespace, app.ServiceAccountName, app.HealthCheckPort, app.HealthCheckProtocol, "", b.features.EnableDataPlaneTLS, b.features.RequireDataPlaneClientCerts) if err != nil { return nil, fmt.Errorf("could not create federation CDS Cluster for authority=%s and gRPC application %+v: %w", b.authority, app, err) } b.clusters[xdstpCluster.Name] = xdstpCluster } } // Merge endpoints from multiple informers for the same app: endpointsByClusterKey := fmt.Sprintf("%s-%d", app.Name, app.ServingPort) b.endpointsByCluster[endpointsByClusterKey] = append(b.endpointsByCluster[endpointsByClusterKey], app.Endpoints...) clusterLoadAssignment := eds.CreateClusterLoadAssignment(app.Name, app.ServingPort, b.nodeHash, b.localityPriorityMapper, b.endpointsByCluster[endpointsByClusterKey]) b.clusterLoadAssignments[clusterLoadAssignment.ClusterName] = clusterLoadAssignment if b.features.EnableFederation { xdstpEDSServiceName := xdstpEdsService(b.authority, app.Name) xdstpClusterLoadAssignment := eds.CreateClusterLoadAssignment(xdstpEDSServiceName, app.ServingPort, b.nodeHash, b.localityPriorityMapper, b.endpointsByCluster[endpointsByClusterKey]) b.clusterLoadAssignments[xdstpClusterLoadAssignment.ClusterName] = xdstpClusterLoadAssignment } } return b, nil } func xdstpListener(authority string, listenerName string) string { return fmt.Sprintf("xdstp://%s/envoy.config.listener.v3.Listener/%s", authority, listenerName) } func xdstpRouteConfiguration(authority string, routeConfigurationName string) string { return fmt.Sprintf("xdstp://%s/envoy.config.route.v3.RouteConfiguration/%s", authority, routeConfigurationName) } func xdstpCluster(authority string, clusterName string) string { return fmt.Sprintf("xdstp://%s/envoy.config.cluster.v3.Cluster/%s", authority, clusterName) } func xdstpEdsService(authority string, serviceName string) string { return fmt.Sprintf("xdstp://%s/envoy.config.endpoint.v3.ClusterLoadAssignment/%s", authority, serviceName) } // AddGRPCServerListenerAddresses adds server listeners and associated route // configurations with the provided IP addresses and ports to the snapshot. func (b *SnapshotBuilder) AddGRPCServerListenerAddresses(addresses []EndpointAddress) *SnapshotBuilder { for _, address := range addresses { b.grpcServerListenerAddresses[address] = true } return b } // Build adds the server listeners and route configuration for the node hash, and then builds the snapshot. func (b *SnapshotBuilder) Build() (cachev3.ResourceSnapshot, error) { for address := range b.grpcServerListenerAddresses { serverListener, err := lds.CreateGRPCServerListener(address.Host, address.Port, b.features.EnableDataPlaneTLS, b.features.RequireDataPlaneClientCerts, b.features.EnableRBAC) if err != nil { return nil, fmt.Errorf("could not create LDS server Listener for address %s:%d: %w", address.Host, address.Port, err) } b.listeners[serverListener.Name] = serverListener } if len(b.grpcServerListenerAddresses) > 0 { routeConfigurationForGRPCServerListener, err := rds.CreateRouteConfigurationForGRPCServerListener(b.features.EnableRBAC) if err != nil { return nil, fmt.Errorf("could not create RDS RouteConfiguration for LDS server Listener: %w", err) } b.routeConfigurations[routeConfigurationForGRPCServerListener.Name] = routeConfigurationForGRPCServerListener } // Envoy proxies will not accept the gRPC server Listeners, because all the routes in their RouteConfigurations // specify `NonForwardingAction` as the action. // Envoy proxies will also not accept the API Listeners created for gRPC clients, because Envoy proxies can only // have at most one API Listener defined, and that API Listener must be a static resource (not fetched via xDS). // TODO: Add gRPC-JSON transcoding and gRPC HTTP/1.1 bridge. // https://www.envoyproxy.io/docs/envoy/latest/configuration/http/http_filters/grpc_json_transcoder_filter // https://www.envoyproxy.io/docs/envoy/latest/configuration/http/http_filters/grpc_http1_bridge_filter envoyGRPCListener, err := lds.CreateEnvoyGRPCListener(50051, true) if err != nil { return nil, fmt.Errorf("could not create LDS Listener for Envoy proxy receiving gRPC requests: %w", err) } b.listeners[envoyGRPCListener.Name] = envoyGRPCListener var clusterNames []string for clusterName := range b.clusters { clusterNames = append(clusterNames, clusterName) } routeConfigurationForEnvoyGRPCListener, err := rds.CreateRouteConfigurationForEnvoyGRPCListener(clusterNames) if err != nil { return nil, fmt.Errorf("could not create RDS RouteConfiguration for Envoy proxy gRPC LDS Listener: %w", err) } b.routeConfigurations[routeConfigurationForEnvoyGRPCListener.Name] = routeConfigurationForEnvoyGRPCListener listenerResources := make([]types.Resource, len(b.listeners)) i := 0 for _, listener := range b.listeners { listenerResources[i] = listener i++ } routeConfigurationResources := make([]types.Resource, len(b.routeConfigurations)) j := 0 for _, routeConfiguration := range b.routeConfigurations { routeConfigurationResources[j] = routeConfiguration j++ } clusterResources := make([]types.Resource, len(b.clusters)) k := 0 for _, cluster := range b.clusters { clusterResources[k] = cluster k++ } clusterLoadAssignmentResources := make([]types.Resource, len(b.clusterLoadAssignments)) l := 0 for _, clusterLoadAssignment := range b.clusterLoadAssignments { clusterLoadAssignmentResources[l] = clusterLoadAssignment l++ } version := strconv.FormatInt(time.Now().UnixNano(), 10) return cachev3.NewSnapshot(version, map[resource.Type][]types.Resource{ resource.ListenerType: listenerResources, resource.RouteType: routeConfigurationResources, resource.ClusterType: clusterResources, resource.EndpointType: clusterLoadAssignmentResources, }) }