pkg/bootstrap/server.go (429 lines of code) (raw):

// Copyright (c) 2022 Alibaba Group Holding Ltd. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package bootstrap import ( "fmt" "net" "net/http" "time" prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" "google.golang.org/grpc" "google.golang.org/grpc/reflection" "istio.io/api/mesh/v1alpha1" configaggregate "istio.io/istio/pilot/pkg/config/aggregate" "istio.io/istio/pilot/pkg/features" istiogrpc "istio.io/istio/pilot/pkg/grpc" "istio.io/istio/pilot/pkg/model" "istio.io/istio/pilot/pkg/server" "istio.io/istio/pilot/pkg/serviceregistry/aggregate" kubecontroller "istio.io/istio/pilot/pkg/serviceregistry/kube/controller" "istio.io/istio/pilot/pkg/xds" "istio.io/istio/pkg/cluster" "istio.io/istio/pkg/config" "istio.io/istio/pkg/config/constants" "istio.io/istio/pkg/config/mesh" "istio.io/istio/pkg/config/schema/collections" "istio.io/istio/pkg/config/schema/gvk" "istio.io/istio/pkg/config/schema/kind" "istio.io/istio/pkg/keepalive" istiokube "istio.io/istio/pkg/kube" "istio.io/istio/pkg/log" "istio.io/istio/pkg/security" "istio.io/istio/security/pkg/server/ca/authenticate" "istio.io/istio/security/pkg/server/ca/authenticate/kubeauth" "istio.io/pkg/ledger" "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" "github.com/alibaba/higress/pkg/cert" higressconfig "github.com/alibaba/higress/pkg/config" "github.com/alibaba/higress/pkg/ingress/kube/common" "github.com/alibaba/higress/pkg/ingress/mcp" "github.com/alibaba/higress/pkg/ingress/translation" higresskube "github.com/alibaba/higress/pkg/kube" ) type XdsOptions struct { // DebounceAfter is the delay added to events to wait after a registry/config event for debouncing. // This will delay the push by at least this interval, plus the time getting subsequent events. If no change is // detected the push will happen, otherwise we'll keep delaying until things settle. DebounceAfter time.Duration // DebounceMax is the maximum time to wait for events while debouncing. Defaults to 10 seconds. If events keep // showing up with no break for this time, we'll trigger a push. DebounceMax time.Duration // EnableEDSDebounce indicates whether EDS pushes should be debounced. EnableEDSDebounce bool // KeepConfigLabels indicates whether to keep all the labels when converting configs to xDS resources. KeepConfigLabels bool // KeepConfigAnnotations indicates whether to keep all the annotations when converting configs to xDS resources. KeepConfigAnnotations bool } // RegistryOptions provide configuration options for the configuration controller. If FileDir is set, that directory will // be monitored for CRD yaml files and will update the controller as those files change (This is used for testing // purposes). Otherwise, a CRD client is created based on the configuration. type RegistryOptions struct { // If FileDir is set, the below kubernetes options are ignored FileDir string Registries []string // Kubernetes controller options KubeOptions kubecontroller.Options // ClusterRegistriesNamespace specifies where the multi-cluster secret resides ClusterRegistriesNamespace string KubeConfig string // DistributionTracking control DistributionCacheRetention time.Duration // DistributionTracking control DistributionTrackingEnabled bool } type ServerArgs struct { Debug bool MeshId string RegionId string NativeIstio bool HttpAddress string GrpcAddress string // IngressClass filters which ingress resources the higress controller watches. // The default ingress class is higress. // There are some special cases for special ingress class. // 1. When the ingress class is set as nginx, the higress controller will watch ingress // resources with the nginx ingress class or without any ingress class. // 2. When the ingress class is set empty, the higress controller will watch all ingress // resources in the k8s cluster. IngressClass string EnableStatus bool WatchNamespace string GrpcKeepAliveOptions *keepalive.Options XdsOptions XdsOptions RegistryOptions RegistryOptions KeepStaleWhenEmpty bool GatewaySelectorKey string GatewaySelectorValue string GatewayHttpPort uint32 GatewayHttpsPort uint32 EnableAutomaticHttps bool AutomaticHttpsEmail string CertHttpAddress string } type readinessProbe func() (bool, error) type ServerInterface interface { Start(stop <-chan struct{}) error WaitUntilCompletion() } type Server struct { *ServerArgs environment *model.Environment kubeClient higresskube.Client configController model.ConfigStoreController configStores []model.ConfigStoreController httpServer *http.Server httpMux *http.ServeMux grpcServer *grpc.Server xdsServer *xds.DiscoveryServer server server.Instance readinessProbes map[string]readinessProbe certServer *cert.Server } func NewServer(args *ServerArgs) (*Server, error) { e := model.NewEnvironment() e.DomainSuffix = constants.DefaultClusterLocalDomain e.SetLedger(buildLedger(args.RegistryOptions)) ac := aggregate.NewController(aggregate.Options{ MeshHolder: e, }) e.ServiceDiscovery = ac s := &Server{ ServerArgs: args, httpMux: http.NewServeMux(), environment: e, readinessProbes: make(map[string]readinessProbe), server: server.New(), } s.environment.Watcher = mesh.NewFixedWatcher(&v1alpha1.MeshConfig{}) s.environment.Init() initFuncList := []func() error{ s.initKubeClient, s.initXdsServer, s.initHttpServer, s.initConfigController, s.initRegistryEventHandlers, s.initAuthenticators, s.initAutomaticHttps, } for _, f := range initFuncList { if err := f(); err != nil { return nil, err } } s.server.RunComponent("kube-client", func(stop <-chan struct{}) error { s.kubeClient.RunAndWait(stop) return nil }) s.readinessProbes["xds"] = func() (bool, error) { return s.xdsServer.IsServerReady(), nil } return s, nil } // initRegistryEventHandlers sets up event handlers for config updates func (s *Server) initRegistryEventHandlers() error { log.Info("initializing registry event handlers") configHandler := func(prev config.Config, curr config.Config, event model.Event) { // For update events, trigger push only if spec has changed. pushReq := &model.PushRequest{ Full: true, ConfigsUpdated: map[model.ConfigKey]struct{}{{ Kind: kind.MustFromGVK(curr.GroupVersionKind), Name: curr.Name, Namespace: curr.Namespace, }: {}}, Reason: model.NewReasonStats(model.ConfigUpdate), } s.xdsServer.ConfigUpdate(pushReq) } schemas := common.IngressIR.All() for _, schema := range schemas { s.configController.RegisterEventHandler(schema.GroupVersionKind(), configHandler) } return nil } func (s *Server) initConfigController() error { ns := higressconfig.PodNamespace options := common.Options{ Enable: true, ClusterId: s.RegistryOptions.KubeOptions.ClusterID, IngressClass: s.IngressClass, WatchNamespace: s.WatchNamespace, EnableStatus: s.EnableStatus, SystemNamespace: higressconfig.PodNamespace, GatewaySelectorKey: s.GatewaySelectorKey, GatewaySelectorValue: s.GatewaySelectorValue, GatewayHttpPort: s.GatewayHttpPort, GatewayHttpsPort: s.GatewayHttpsPort, } if options.ClusterId == "Kubernetes" { options.ClusterId = "" } ingressConfig := translation.NewIngressTranslation(s.kubeClient, s.xdsServer, ns, options) ingressConfig.AddLocalCluster(options) s.configStores = append(s.configStores, ingressConfig) // Wrap the config controller with a cache. aggregateConfigController, err := configaggregate.MakeCache(s.configStores) if err != nil { return err } s.configController = aggregateConfigController // Create the config store. s.environment.ConfigStore = aggregateConfigController // s.environment.IngressStore = ingressConfig // Defer starting the controller until after the service is created. s.server.RunComponent("config-controller", func(stop <-chan struct{}) error { go s.configController.Run(stop) return nil }) return nil } func (s *Server) Start(stop <-chan struct{}) error { if err := s.server.Start(stop); err != nil { return err } if !s.waitForCacheSync(stop) { return fmt.Errorf("failed to sync cache") } // Inform Discovery Server so that it can start accepting connections. s.xdsServer.CachesSynced() grpcListener, err := net.Listen("tcp", s.GrpcAddress) if err != nil { return err } go func() { log.Infof("starting gRPC discovery service at %s", grpcListener.Addr()) if err := s.grpcServer.Serve(grpcListener); err != nil { log.Errorf("error serving GRPC server: %v", err) } }() httpListener, err := net.Listen("tcp", s.HttpAddress) if err != nil { return err } go func() { log.Infof("starting HTTP service at %s", httpListener.Addr()) if err := s.httpServer.Serve(httpListener); err != nil { log.Errorf("error serving http server: %v", err) } }() if s.EnableAutomaticHttps { go func() { log.Infof("starting Automatic Cert HTTP service at %s", s.CertHttpAddress) if err := s.certServer.Run(stop); err != nil { log.Errorf("error serving Automatic Cert HTTP server: %v", err) } }() } s.waitForShutDown(stop) return nil } func (s *Server) waitForShutDown(stop <-chan struct{}) { go func() { <-stop stopped := make(chan struct{}) go func() { // Some grpcServer implementations do not support GracefulStop. Unfortunately, this is not // exposed; they just panic. To avoid this, we will recover and do a standard Stop when its not // support. defer func() { if r := recover(); r != nil { s.grpcServer.Stop() close(stopped) } }() s.grpcServer.GracefulStop() close(stopped) }() timer := time.NewTimer(time.Second * 2) select { case <-timer.C: s.grpcServer.Stop() case <-stopped: timer.Stop() } s.xdsServer.Shutdown() }() } func (s *Server) WaitUntilCompletion() { s.server.Wait() } func (s *Server) initXdsServer() error { log.Info("init xds server") s.xdsServer = xds.NewDiscoveryServer(s.environment, higressconfig.PodName, cluster.ID(higressconfig.PodNamespace), s.RegistryOptions.KubeOptions.ClusterAliases) generatorOptions := mcp.GeneratorOptions{KeepConfigLabels: s.XdsOptions.KeepConfigLabels, KeepConfigAnnotations: s.XdsOptions.KeepConfigAnnotations} s.xdsServer.Generators[gvk.WasmPlugin.String()] = &mcp.WasmPluginGenerator{Environment: s.environment, Server: s.xdsServer, GeneratorOptions: generatorOptions} s.xdsServer.Generators[gvk.DestinationRule.String()] = &mcp.DestinationRuleGenerator{Environment: s.environment, Server: s.xdsServer, GeneratorOptions: generatorOptions} s.xdsServer.Generators[gvk.EnvoyFilter.String()] = &mcp.EnvoyFilterGenerator{Environment: s.environment, Server: s.xdsServer, GeneratorOptions: generatorOptions} s.xdsServer.Generators[gvk.Gateway.String()] = &mcp.GatewayGenerator{Environment: s.environment, Server: s.xdsServer, GeneratorOptions: generatorOptions} s.xdsServer.Generators[gvk.VirtualService.String()] = &mcp.VirtualServiceGenerator{Environment: s.environment, Server: s.xdsServer, GeneratorOptions: generatorOptions} s.xdsServer.Generators[gvk.ServiceEntry.String()] = &mcp.ServiceEntryGenerator{Environment: s.environment, Server: s.xdsServer, GeneratorOptions: generatorOptions} for _, schema := range collections.Pilot.All() { gvk := schema.GroupVersionKind().String() if _, ok := s.xdsServer.Generators[gvk]; !ok { s.xdsServer.Generators[gvk] = &mcp.FallbackGenerator{Environment: s.environment, Server: s.xdsServer} } } s.xdsServer.ProxyNeedsPush = func(proxy *model.Proxy, req *model.PushRequest) bool { return true } s.server.RunComponent("xds-server", func(stop <-chan struct{}) error { log.Infof("Starting ADS server") s.xdsServer.Start(stop) return nil }) return s.initGrpcServer() } func (s *Server) initGrpcServer() error { interceptors := []grpc.UnaryServerInterceptor{ // setup server prometheus monitoring (as final interceptor in chain) prometheus.UnaryServerInterceptor, } grpcOptions := istiogrpc.ServerOptions(s.GrpcKeepAliveOptions, interceptors...) s.grpcServer = grpc.NewServer(grpcOptions...) s.xdsServer.Register(s.grpcServer) reflection.Register(s.grpcServer) return nil } func (s *Server) initAuthenticators() error { authenticators := []security.Authenticator{ &authenticate.ClientCertAuthenticator{}, } authenticators = append(authenticators, kubeauth.NewKubeJWTAuthenticator(s.environment.Watcher, s.kubeClient.Kube(), s.RegistryOptions.KubeOptions.ClusterID, nil, features.JwtPolicy)) if features.XDSAuth { s.xdsServer.Authenticators = authenticators } return nil } func (s *Server) initAutomaticHttps() error { certOption := &cert.Option{ Namespace: higressconfig.PodNamespace, ServerAddress: s.CertHttpAddress, Email: s.AutomaticHttpsEmail, } certServer, err := cert.NewServer(s.kubeClient.Kube(), s.xdsServer, certOption) if err != nil { return err } s.certServer = certServer log.Infof("init cert default config") s.certServer.InitDefaultConfig() if !s.EnableAutomaticHttps { log.Info("automatic https is disabled") return nil } return s.certServer.InitServer() } func (s *Server) initKubeClient() error { if s.kubeClient != nil { // Already initialized by startup arguments return nil } kubeRestConfig, err := istiokube.DefaultRestConfig(s.RegistryOptions.KubeConfig, "", func(config *rest.Config) { config.QPS = s.RegistryOptions.KubeOptions.KubernetesAPIQPS config.Burst = s.RegistryOptions.KubeOptions.KubernetesAPIBurst }) if err != nil { return fmt.Errorf("failed creating kube config: %v", err) } s.kubeClient, err = higresskube.NewClient(istiokube.NewClientConfigForRestConfig(kubeRestConfig), "higress") if err != nil { return fmt.Errorf("failed creating kube client: %v", err) } s.kubeClient = higresskube.EnableCrdWatcher(s.kubeClient) return nil } func (s *Server) initHttpServer() error { s.httpServer = &http.Server{ Addr: s.HttpAddress, Handler: s.httpMux, IdleTimeout: 90 * time.Second, // matches http.DefaultTransport keep-alive timeout ReadTimeout: 30 * time.Second, } s.xdsServer.AddDebugHandlers(s.httpMux, nil, true, nil) s.httpMux.HandleFunc("/ready", s.readyHandler) s.httpMux.HandleFunc("/registry/watcherStatus", s.registryWatcherStatusHandler) return nil } // readyHandler checks whether the http server is ready func (s *Server) readyHandler(w http.ResponseWriter, _ *http.Request) { for name, fn := range s.readinessProbes { if ready, err := fn(); !ready { log.Warnf("%s is not ready: %v", name, err) w.WriteHeader(http.StatusServiceUnavailable) return } } w.WriteHeader(http.StatusOK) } func (s *Server) registryWatcherStatusHandler(w http.ResponseWriter, _ *http.Request) { ingressTranslation, ok := s.environment.IngressStore.(*translation.IngressTranslation) if !ok { http.Error(w, "IngressStore not found", http.StatusNotFound) return } ingressConfig := ingressTranslation.GetIngressConfig() if ingressConfig == nil { http.Error(w, "IngressConfig not found", http.StatusNotFound) return } registryReconciler := ingressConfig.RegistryReconciler if registryReconciler == nil { http.Error(w, "RegistryReconciler not found", http.StatusNotFound) return } watcherStatusList := registryReconciler.GetRegistryWatcherStatusList() writeJSON(w, watcherStatusList) } func writeJSON(w http.ResponseWriter, obj interface{}) { w.Header().Set("Content-Type", "application/json") b, err := config.ToJSON(obj) if err != nil { w.WriteHeader(http.StatusInternalServerError) _, _ = w.Write([]byte(err.Error())) return } _, err = w.Write(b) if err != nil { w.WriteHeader(http.StatusInternalServerError) } } // cachesSynced checks whether caches have been synced. func (s *Server) cachesSynced() bool { return s.configController.HasSynced() } func (s *Server) waitForCacheSync(stop <-chan struct{}) bool { start := time.Now() log.Info("Waiting for caches to be synced") if !cache.WaitForCacheSync(stop, s.cachesSynced) { log.Errorf("Failed waiting for cache sync") return false } log.Infof("All controller caches have been synced up in %v", time.Since(start)) // At this point, we know that all update events of the initial state-of-the-world have been // received. We wait to ensure we have committed at least this many updates. This avoids a race // condition where we are marked ready prior to updating the push context, leading to incomplete // pushes. expected := s.xdsServer.InboundUpdates.Load() if !cache.WaitForCacheSync(stop, func() bool { return s.pushContextReady(expected) }) { log.Errorf("Failed waiting for push context initialization") return false } return true } // pushContextReady indicates whether pushcontext has processed all inbound config updates. func (s *Server) pushContextReady(expected int64) bool { committed := s.xdsServer.CommittedUpdates.Load() if committed < expected { log.Debugf("Waiting for pushcontext to process inbound updates, inbound: %v, committed : %v", expected, committed) return false } return true } func buildLedger(ca RegistryOptions) ledger.Ledger { var result ledger.Ledger if ca.DistributionTrackingEnabled { result = ledger.Make(ca.DistributionCacheRetention) } else { result = &model.DisabledLedger{} } return result }