grpc-xds/control-plane-go/pkg/server/server.go (256 lines of code) (raw):

// Copyright 2023 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package server import ( "context" "fmt" "net" "time" clusterv3 "github.com/envoyproxy/go-control-plane/envoy/service/cluster/v3" discoveryv3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" endpointv3 "github.com/envoyproxy/go-control-plane/envoy/service/endpoint/v3" listenerv3 "github.com/envoyproxy/go-control-plane/envoy/service/listener/v3" routev3 "github.com/envoyproxy/go-control-plane/envoy/service/route/v3" runtimev3 "github.com/envoyproxy/go-control-plane/envoy/service/runtime/v3" secretv3 "github.com/envoyproxy/go-control-plane/envoy/service/secret/v3" serverv3 "github.com/envoyproxy/go-control-plane/pkg/server/v3" "github.com/go-logr/logr" "google.golang.org/grpc" "google.golang.org/grpc/admin" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/credentials/tls/certprovider" "google.golang.org/grpc/credentials/tls/certprovider/pemfile" "google.golang.org/grpc/health" healthpb "google.golang.org/grpc/health/grpc_health_v1" "google.golang.org/grpc/keepalive" "google.golang.org/grpc/reflection" "google.golang.org/grpc/security/advancedtls" "google.golang.org/protobuf/encoding/protojson" "github.com/googlecloudplatform/solutions-workshops/grpc-xds/control-plane-go/pkg/informers" "github.com/googlecloudplatform/solutions-workshops/grpc-xds/control-plane-go/pkg/interceptors" "github.com/googlecloudplatform/solutions-workshops/grpc-xds/control-plane-go/pkg/logging" "github.com/googlecloudplatform/solutions-workshops/grpc-xds/control-plane-go/pkg/xds" "github.com/googlecloudplatform/solutions-workshops/grpc-xds/control-plane-go/pkg/xds/eds" ) // gRPC configuration based on https://github.com/envoyproxy/go-control-plane/blob/v0.11.1/internal/example/server.go const ( grpcKeepaliveTime = 30 * time.Second grpcKeepaliveTimeout = 5 * time.Second grpcKeepaliveMinTime = 30 * time.Second grpcMaxConcurrentStreams = 1000000 ) type transportCredentials struct { credentials.TransportCredentials providers []certprovider.Provider } // Close cleans up resources used by the credentials. func (c *transportCredentials) Close() { for _, provider := range c.providers { if provider != nil { provider.Close() } } } func Run(ctx context.Context, servingPort int, healthPort int, kubecontexts []informers.Kubecontext, xdsFeatures *xds.Features, authority string) error { logger := logging.FromContext(ctx) serverCredentials, err := createServerCredentials(logger, xdsFeatures) if err != nil { return fmt.Errorf("could not create server-side transport credentials: %w", err) } defer serverCredentials.Close() grpcOptions := serverOptions(logger, serverCredentials) server := grpc.NewServer(grpcOptions...) healthGRPCServer := grpc.NewServer() healthServer := health.NewServer() addServerStopBehavior(ctx, logger, server, healthGRPCServer, healthServer) healthServer.SetServingStatus("", healthpb.HealthCheckResponse_SERVING) healthpb.RegisterHealthServer(server, healthServer) healthpb.RegisterHealthServer(healthGRPCServer, healthServer) cleanup, err := registerAdminServers(server, healthGRPCServer) if err != nil { return fmt.Errorf("could not register gRPC Channelz and CSDS admin services: %w", err) } defer cleanup() reflection.Register(server) reflection.Register(healthGRPCServer) xdsCache := xds.NewSnapshotCache(ctx, true, xds.ZoneHash{}, eds.LocalityPriorityByZone{}, xdsFeatures, authority) xdsServer := serverv3.NewServer(ctx, xdsCache, xdsServerCallbackFuncs(logger)) registerXDSServices(server, xdsServer) if err := createInformers(ctx, logger, kubecontexts, xdsCache); err != nil { return fmt.Errorf("could not create Kubernetes informer managers: %w", err) } tcpListener, err := net.Listen("tcp", fmt.Sprintf(":%d", servingPort)) if err != nil { return fmt.Errorf("could not create TCP listener on port=%d: %w", servingPort, err) } healthTCPListener, err := net.Listen("tcp", fmt.Sprintf(":%d", healthPort)) if err != nil { return fmt.Errorf("could not create TCP listener on port=%d: %w", healthPort, err) } logger.V(1).Info("xDS control plane management server listening", "port", servingPort, "healthPort", healthPort) go func() { err := server.Serve(tcpListener) if err != nil { healthServer.SetServingStatus("", healthpb.HealthCheckResponse_NOT_SERVING) } }() return healthGRPCServer.Serve(healthTCPListener) } func registerAdminServers(servingGRPCServer *grpc.Server, healthGRPCServer *grpc.Server) (func(), error) { cleanupServing, err := admin.Register(servingGRPCServer) if err != nil { return func() {}, fmt.Errorf("could not register Channelz and CSDS admin services to serving server: %w", err) } cleanupHealth, err := admin.Register(healthGRPCServer) if err != nil { return func() {}, fmt.Errorf("could not register Channelz and CSDS admin services to health server: %w", err) } return func() { cleanupServing() cleanupHealth() }, nil } func xdsServerCallbackFuncs(logger logr.Logger) *serverv3.CallbackFuncs { return &serverv3.CallbackFuncs{ StreamRequestFunc: func(streamID int64, request *discoveryv3.DiscoveryRequest) error { logger.Info("StreamRequest", "streamID", streamID, "type", request.GetTypeUrl(), "resourceNames", request.ResourceNames) return nil }, StreamResponseFunc: func(_ context.Context, streamID int64, _ *discoveryv3.DiscoveryRequest, response *discoveryv3.DiscoveryResponse) { protoMarshalOptions := protojson.MarshalOptions{ Multiline: true, Indent: " ", AllowPartial: true, } for _, anyResource := range response.Resources { if anyResource == nil { continue } protoResource, err := anyResource.UnmarshalNew() if err != nil { logger.Error(err, "StreamResponse: could not unmarshall Any message") continue } jsonResourceBytes, err := protoMarshalOptions.Marshal(protoResource) if err != nil { logger.Error(err, "StreamResponse: could not marshall proto message to JSON") continue } // Logging each resource instead of a slice of resources, to take advantage of multi-line logging, // which is helpful for development and exploration. logger.Info("StreamResponse", "streamID", streamID, "type", response.GetTypeUrl(), "resource", string(jsonResourceBytes)) } }, } } func registerXDSServices(grpcServer *grpc.Server, xdsServer serverv3.Server) { discoveryv3.RegisterAggregatedDiscoveryServiceServer(grpcServer, xdsServer) endpointv3.RegisterEndpointDiscoveryServiceServer(grpcServer, xdsServer) clusterv3.RegisterClusterDiscoveryServiceServer(grpcServer, xdsServer) routev3.RegisterRouteDiscoveryServiceServer(grpcServer, xdsServer) listenerv3.RegisterListenerDiscoveryServiceServer(grpcServer, xdsServer) secretv3.RegisterSecretDiscoveryServiceServer(grpcServer, xdsServer) runtimev3.RegisterRuntimeDiscoveryServiceServer(grpcServer, xdsServer) } func createInformers(ctx context.Context, logger logr.Logger, kubecontexts []informers.Kubecontext, xdsCache *xds.SnapshotCache) error { for _, kubecontext := range kubecontexts { informerManager, err := informers.NewManager(ctx, kubecontext.Context, xdsCache) if err != nil { return fmt.Errorf("could not create Kubernetes informer manager for context=%s: %w", kubecontext.Context, err) } for _, informer := range kubecontext.Informers { if err := informerManager.AddEndpointSliceInformer(ctx, logger, informer); err != nil { return fmt.Errorf("could not create Kubernetes informer for context=%s for %+v: %w", kubecontext.Context, informer, err) } } } return nil } // serverOptions sets gRPC server options. // // gRPC golang library sets a very small upper bound for the number gRPC/h2 // streams over a single TCP connection. If a proxy multiplexes requests over // a single connection to the management server, then it might lead to // availability problems. // Keepalive timeouts based on connection_keepalive parameter https://www.envoyproxy.io/docs/envoy/latest/configuration/overview/examples#dynamic // Source: https://github.com/envoyproxy/go-control-plane/blob/v0.11.1/internal/example/server.go#L67 func serverOptions(logger logr.Logger, transportCredentials credentials.TransportCredentials) []grpc.ServerOption { return []grpc.ServerOption{ grpc.ChainStreamInterceptor(interceptors.StreamServerLogging(logger)), grpc.ChainUnaryInterceptor(interceptors.UnaryServerLogging(logger)), grpc.Creds(transportCredentials), grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{ MinTime: grpcKeepaliveMinTime, PermitWithoutStream: true, }), grpc.KeepaliveParams(keepalive.ServerParameters{ Time: grpcKeepaliveTime, Timeout: grpcKeepaliveTimeout, }), grpc.MaxConcurrentStreams(grpcMaxConcurrentStreams), } } func createServerCredentials(logger logr.Logger, xdsFeatures *xds.Features) (*transportCredentials, error) { if !xdsFeatures.EnableControlPlaneTLS { logger.V(2).Info("using insecure credentials for the control plane server") return &transportCredentials{ TransportCredentials: insecure.NewCredentials(), }, nil } logger.V(2).Info("using mTLS with automatic certificate reloading for the control plane server") identityOptions := pemfile.Options{ CertFile: "/var/run/secrets/workload-spiffe-credentials/certificates.pem", KeyFile: "/var/run/secrets/workload-spiffe-credentials/private_key.pem", RefreshDuration: 600 * time.Second, } identityProvider, err := pemfile.NewProvider(identityOptions) if err != nil { return nil, fmt.Errorf("could not create a new certificate provider for identityOptions=%+v: %w", identityOptions, err) } providers := []certprovider.Provider{identityProvider} options := &advancedtls.Options{ IdentityOptions: advancedtls.IdentityCertificateOptions{ IdentityProvider: identityProvider, }, AdditionalPeerVerification: func(params *advancedtls.HandshakeVerificationInfo) (*advancedtls.PostHandshakeVerificationResults, error) { // Not actually checking anything, just logging the client's SPIFFE ID. // SPIFFE certificates must have exactly one URI SAN. if len(params.Leaf.URIs) == 1 && params.Leaf.URIs[0] != nil { logger.V(2).Info("Client TLS certificate", "spiffeID", *params.Leaf.URIs[0]) } return &advancedtls.PostHandshakeVerificationResults{}, nil }, RequireClientCert: false, VerificationType: advancedtls.CertVerification, } if xdsFeatures.RequireControlPlaneClientCerts { rootOptions := pemfile.Options{ RootFile: "/var/run/secrets/workload-spiffe-credentials/ca_certificates.pem", RefreshDuration: 600 * time.Second, } rootProvider, err := pemfile.NewProvider(rootOptions) if err != nil { return nil, fmt.Errorf("could not create a new certificate provider for rootOptions=%+v: %w", rootOptions, err) } providers = append(providers, rootProvider) options.RootOptions = advancedtls.RootCertificateOptions{ RootProvider: rootProvider, } options.RequireClientCert = true } logger.Info("advancedtls", "options", options) serverCredentials, err := advancedtls.NewServerCreds(options) if err != nil { return nil, fmt.Errorf("could not create server credentials from options %+v: %w", options, err) } return &transportCredentials{ TransportCredentials: serverCredentials, providers: providers, }, err } func addServerStopBehavior(ctx context.Context, logger logr.Logger, servingGRPCServer *grpc.Server, healthGRPCServer *grpc.Server, healthServer *health.Server) { go func() { <-ctx.Done() healthServer.SetServingStatus("", healthpb.HealthCheckResponse_NOT_SERVING) stopped := make(chan struct{}) go func() { logger.Info("Attempting to gracefully stop the xDS management server") servingGRPCServer.GracefulStop() close(stopped) }() t := time.NewTimer(5 * time.Second) select { case <-t.C: logger.Info("Stopping the xDS management server immediately") servingGRPCServer.Stop() healthGRPCServer.Stop() case <-stopped: t.Stop() } }() }