pkg/ingress/config/kingress_config.go (458 lines of code) (raw):

// Copyright (c) 2022 Alibaba Group Holding Ltd. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package config import ( "sync" networking "istio.io/api/networking/v1alpha3" istiomodel "istio.io/istio/pilot/pkg/model" "istio.io/istio/pkg/cluster" "istio.io/istio/pkg/config" "istio.io/istio/pkg/config/constants" "istio.io/istio/pkg/config/schema/collection" "istio.io/istio/pkg/config/schema/gvk" "istio.io/istio/pkg/config/schema/kind" "istio.io/istio/pkg/util/sets" v1 "k8s.io/api/core/v1" listersv1 "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" "github.com/alibaba/higress/pkg/ingress/kube/annotations" "github.com/alibaba/higress/pkg/ingress/kube/common" "github.com/alibaba/higress/pkg/ingress/kube/kingress" "github.com/alibaba/higress/pkg/ingress/kube/secret" "github.com/alibaba/higress/pkg/ingress/kube/util" . "github.com/alibaba/higress/pkg/ingress/log" "github.com/alibaba/higress/pkg/kube" "github.com/alibaba/higress/registry/reconcile" ) var ( _ istiomodel.ConfigStoreController = &KIngressConfig{} _ istiomodel.IngressStore = &KIngressConfig{} ) type KIngressConfig struct { remoteIngressControllers map[cluster.ID]common.KIngressController mutex sync.RWMutex ingressRouteCache istiomodel.IngressRouteCollection ingressDomainCache istiomodel.IngressDomainCollection localKubeClient kube.Client virtualServiceHandlers []istiomodel.EventHandler gatewayHandlers []istiomodel.EventHandler envoyFilterHandlers []istiomodel.EventHandler watchErrorHandler cache.WatchErrorHandler cachedEnvoyFilters []config.Config watchedSecretSet sets.Set[string] RegistryReconciler *reconcile.Reconciler XDSUpdater istiomodel.XDSUpdater annotationHandler annotations.AnnotationHandler globalGatewayName string namespace string clusterId cluster.ID } func NewKIngressConfig(localKubeClient kube.Client, XDSUpdater istiomodel.XDSUpdater, namespace string, options common.Options) *KIngressConfig { if localKubeClient.KIngressInformer() == nil { return nil } clusterId := options.ClusterId if clusterId == "Kubernetes" { clusterId = "" } config := &KIngressConfig{ remoteIngressControllers: make(map[cluster.ID]common.KIngressController), localKubeClient: localKubeClient, XDSUpdater: XDSUpdater, annotationHandler: annotations.NewAnnotationHandlerManager(), clusterId: clusterId, globalGatewayName: namespace + "/" + common.CreateConvertedName(clusterId.String(), "global"), watchedSecretSet: sets.New[string](), namespace: namespace, } return config } func (m *KIngressConfig) RegisterEventHandler(kind config.GroupVersionKind, f istiomodel.EventHandler) { IngressLog.Infof("register resource %v", kind) switch kind { case gvk.VirtualService: m.virtualServiceHandlers = append(m.virtualServiceHandlers, f) case gvk.Gateway: m.gatewayHandlers = append(m.gatewayHandlers, f) case gvk.EnvoyFilter: m.envoyFilterHandlers = append(m.envoyFilterHandlers, f) } for _, remoteIngressController := range m.remoteIngressControllers { remoteIngressController.RegisterEventHandler(kind, f) } } func (m *KIngressConfig) AddLocalCluster(options common.Options) common.KIngressController { secretController := secret.NewController(m.localKubeClient, options) secretController.AddEventHandler(m.ReflectSecretChanges) var ingressController common.KIngressController ingressController = kingress.NewController(m.localKubeClient, m.localKubeClient, options, secretController) m.remoteIngressControllers[options.ClusterId] = ingressController return ingressController } func (m *KIngressConfig) List(typ config.GroupVersionKind, namespace string) []config.Config { if typ == gvk.EnvoyFilter || typ == gvk.DestinationRule || typ == gvk.WasmPlugin || typ == gvk.ServiceEntry { return nil } if typ != gvk.Gateway && typ != gvk.VirtualService { return nil } // Currently, only support list all namespaces gateways or virtualservices. if namespace != "" { IngressLog.Warnf("ingress store only support type %s of all namespace.", typ) return nil } var configs []config.Config m.mutex.RLock() for _, ingressController := range m.remoteIngressControllers { configs = append(configs, ingressController.List()...) } m.mutex.RUnlock() common.SortIngressByCreationTime(configs) wrapperConfigs := m.createWrapperConfigs(configs) IngressLog.Infof("resource type %s, configs number %d", typ, len(wrapperConfigs)) switch typ { case gvk.Gateway: return m.convertGateways(wrapperConfigs) case gvk.VirtualService: return m.convertVirtualService(wrapperConfigs) } return nil } func (m *KIngressConfig) createWrapperConfigs(configs []config.Config) []common.WrapperConfig { var wrapperConfigs []common.WrapperConfig // Init global context clusterSecretListers := map[cluster.ID]listersv1.SecretLister{} clusterServiceListers := map[cluster.ID]listersv1.ServiceLister{} m.mutex.RLock() for clusterId, controller := range m.remoteIngressControllers { clusterSecretListers[clusterId] = controller.SecretLister() clusterServiceListers[clusterId] = controller.ServiceLister() } m.mutex.RUnlock() globalContext := &annotations.GlobalContext{ WatchedSecrets: sets.New[string](), ClusterSecretLister: clusterSecretListers, ClusterServiceList: clusterServiceListers, } for idx := range configs { rawConfig := configs[idx] annotationsConfig := &annotations.Ingress{ Meta: annotations.Meta{ Namespace: rawConfig.Namespace, Name: rawConfig.Name, RawClusterId: common.GetRawClusterId(rawConfig.Annotations), ClusterId: common.GetClusterId(rawConfig.Annotations), }, } _ = m.annotationHandler.Parse(rawConfig.Annotations, annotationsConfig, globalContext) wrapperConfigs = append(wrapperConfigs, common.WrapperConfig{ Config: &rawConfig, AnnotationsConfig: annotationsConfig, }) } m.mutex.Lock() m.watchedSecretSet = globalContext.WatchedSecrets m.mutex.Unlock() return wrapperConfigs } func (m *KIngressConfig) convertGateways(configs []common.WrapperConfig) []config.Config { convertOptions := common.ConvertOptions{ IngressDomainCache: common.NewIngressDomainCache(), Gateways: map[string]*common.WrapperGateway{}, } for idx := range configs { cfg := configs[idx] clusterId := common.GetClusterId(cfg.Config.Annotations) m.mutex.RLock() ingressController := m.remoteIngressControllers[clusterId] m.mutex.RUnlock() if ingressController == nil { continue } if err := ingressController.ConvertGateway(&convertOptions, &cfg); err != nil { IngressLog.Errorf("Convert ingress %s/%s to gateway fail in cluster %s, err %v", cfg.Config.Namespace, cfg.Config.Name, clusterId, err) } } // apply annotation for _, wrapperGateway := range convertOptions.Gateways { m.annotationHandler.ApplyGateway(wrapperGateway.Gateway, wrapperGateway.WrapperConfig.AnnotationsConfig) } m.mutex.Lock() m.ingressDomainCache = convertOptions.IngressDomainCache.Extract() m.mutex.Unlock() out := make([]config.Config, 0, len(convertOptions.Gateways)) for _, gateway := range convertOptions.Gateways { cleanHost := common.CleanHost(gateway.Host) out = append(out, config.Config{ Meta: config.Meta{ GroupVersionKind: gvk.Gateway, Name: common.CreateConvertedName(constants.IstioIngressGatewayName, cleanHost), Namespace: m.namespace, Annotations: map[string]string{ common.ClusterIdAnnotation: gateway.ClusterId.String(), common.HostAnnotation: gateway.Host, }, }, Spec: gateway.Gateway, }) } return out } func (m *KIngressConfig) convertVirtualService(configs []common.WrapperConfig) []config.Config { convertOptions := common.ConvertOptions{ IngressRouteCache: common.NewIngressRouteCache(), VirtualServices: map[string]*common.WrapperVirtualService{}, HTTPRoutes: map[string][]*common.WrapperHTTPRoute{}, Route2Ingress: map[string]*common.WrapperConfigWithRuleKey{}, } // convert http route for idx := range configs { cfg := configs[idx] clusterId := common.GetClusterId(cfg.Config.Annotations) m.mutex.RLock() ingressController := m.remoteIngressControllers[clusterId] m.mutex.RUnlock() if ingressController == nil { continue } if err := ingressController.ConvertHTTPRoute(&convertOptions, &cfg); err != nil { IngressLog.Errorf("Convert ingress %s/%s to HTTP route fail in cluster %s, err %v", cfg.Config.Namespace, cfg.Config.Name, clusterId, err) } } // Apply annotation on routes for _, routes := range convertOptions.HTTPRoutes { for _, route := range routes { m.annotationHandler.ApplyRoute(route.HTTPRoute, route.WrapperConfig.AnnotationsConfig) } } // Normalize weighted cluster to make sure the sum of weight is 100. for _, host := range convertOptions.HTTPRoutes { for _, route := range host { normalizeWeightedKCluster(convertOptions.IngressRouteCache, route) } } // Apply annotation on virtual services Only IP-control and do nothing for _, virtualService := range convertOptions.VirtualServices { m.annotationHandler.ApplyVirtualServiceHandler(virtualService.VirtualService, virtualService.WrapperConfig.AnnotationsConfig) } // Apply app root for per host. m.applyAppRoot(&convertOptions) // Apply internal active redirect for error page. m.applyInternalActiveRedirect(&convertOptions) m.mutex.Lock() m.ingressRouteCache = convertOptions.IngressRouteCache.Extract() m.mutex.Unlock() // Convert http route to virtual service out := make([]config.Config, 0, len(convertOptions.HTTPRoutes)) for host, routes := range convertOptions.HTTPRoutes { if len(routes) == 0 { continue } cleanHost := common.CleanHost(host) // namespace/name, name format: (istio cluster id)-host gateways := []string{m.namespace + "/" + common.CreateConvertedName(m.clusterId.String(), cleanHost), common.CreateConvertedName(constants.IstioIngressGatewayName, cleanHost)} wrapperVS, exist := convertOptions.VirtualServices[host] if !exist { IngressLog.Warnf("virtual service for host %s does not exist.", host) } vs := wrapperVS.VirtualService vs.Gateways = gateways for _, route := range routes { vs.Http = append(vs.Http, route.HTTPRoute) } firstRoute := routes[0] out = append(out, config.Config{ Meta: config.Meta{ GroupVersionKind: gvk.VirtualService, Name: common.CreateConvertedName(constants.IstioIngressGatewayName, firstRoute.WrapperConfig.Config.Namespace, firstRoute.WrapperConfig.Config.Name, cleanHost), Namespace: m.namespace, Annotations: map[string]string{ common.ClusterIdAnnotation: firstRoute.ClusterId.String(), }, }, Spec: vs, }) } return out } // Make sure that the sum of traffic split ratio is 100, if it is not 100, it will be normalized func normalizeWeightedKCluster(cache *common.IngressRouteCache, route *common.WrapperHTTPRoute) { if len(route.HTTPRoute.Route) == 1 { route.HTTPRoute.Route[0].Weight = 100 return } var weightTotal int32 = 0 for _, routeDestination := range route.HTTPRoute.Route { weightTotal += routeDestination.Weight } var sum int32 for idx, routeDestination := range route.HTTPRoute.Route { if idx == 0 { continue } weight := float32(routeDestination.Weight) / float32(weightTotal) routeDestination.Weight = int32(weight * 100) sum += routeDestination.Weight } route.HTTPRoute.Route[0].Weight = 100 - sum // Update the recorded status in ingress builder if cache != nil { cache.Update(route) } } func (m *KIngressConfig) applyAppRoot(convertOptions *common.ConvertOptions) { for host, wrapVS := range convertOptions.VirtualServices { if wrapVS.AppRoot != "" { route := &common.WrapperHTTPRoute{ HTTPRoute: &networking.HTTPRoute{ Name: common.CreateConvertedName(host, "app-root"), Match: []*networking.HTTPMatchRequest{ { Uri: &networking.StringMatch{ MatchType: &networking.StringMatch_Exact{ Exact: "/", }, }, }, }, Redirect: &networking.HTTPRedirect{ RedirectCode: 302, Uri: wrapVS.AppRoot, }, }, WrapperConfig: wrapVS.WrapperConfig, ClusterId: wrapVS.WrapperConfig.AnnotationsConfig.ClusterId, } convertOptions.HTTPRoutes[host] = append([]*common.WrapperHTTPRoute{route}, convertOptions.HTTPRoutes[host]...) } } } func (m *KIngressConfig) applyInternalActiveRedirect(convertOptions *common.ConvertOptions) { for host, routes := range convertOptions.HTTPRoutes { var tempRoutes []*common.WrapperHTTPRoute for _, route := range routes { tempRoutes = append(tempRoutes, route) if route.HTTPRoute.InternalActiveRedirect != nil { fallbackConfig := route.WrapperConfig.AnnotationsConfig.Fallback if fallbackConfig == nil { continue } typedNamespace := fallbackConfig.DefaultBackend internalRedirectRoute := route.HTTPRoute.DeepCopy() internalRedirectRoute.Name = internalRedirectRoute.Name + annotations.FallbackRouteNameSuffix internalRedirectRoute.InternalActiveRedirect = nil internalRedirectRoute.Match = []*networking.HTTPMatchRequest{ { Uri: &networking.StringMatch{ MatchType: &networking.StringMatch_Exact{ Exact: "/", }, }, Headers: map[string]*networking.StringMatch{ annotations.FallbackInjectHeaderRouteName: { MatchType: &networking.StringMatch_Exact{ Exact: internalRedirectRoute.Name, }, }, annotations.FallbackInjectFallbackService: { MatchType: &networking.StringMatch_Exact{ Exact: typedNamespace.String(), }, }, }, }, } internalRedirectRoute.Route = []*networking.HTTPRouteDestination{ { Destination: &networking.Destination{ Host: util.CreateServiceFQDN(typedNamespace.Namespace, typedNamespace.Name), Port: &networking.PortSelector{ Number: fallbackConfig.Port, }, }, Weight: 100, }, } tempRoutes = append([]*common.WrapperHTTPRoute{{ HTTPRoute: internalRedirectRoute, WrapperConfig: route.WrapperConfig, ClusterId: route.ClusterId, }}, tempRoutes...) } } convertOptions.HTTPRoutes[host] = tempRoutes } } func (m *KIngressConfig) ReflectSecretChanges(clusterNamespacedName util.ClusterNamespacedName) { var hit bool m.mutex.RLock() if m.watchedSecretSet.Contains(clusterNamespacedName.String()) { hit = true } m.mutex.RUnlock() if hit { push := func(gvk config.GroupVersionKind) { m.XDSUpdater.ConfigUpdate(&istiomodel.PushRequest{ Full: true, ConfigsUpdated: map[istiomodel.ConfigKey]struct{}{{ Kind: kind.MustFromGVK(gvk), Name: clusterNamespacedName.Name, Namespace: clusterNamespacedName.Namespace, }: {}}, Reason: istiomodel.NewReasonStats("auth-secret-change"), }) } push(gvk.VirtualService) push(gvk.EnvoyFilter) } } func (m *KIngressConfig) Run(stop <-chan struct{}) { for _, remoteIngressController := range m.remoteIngressControllers { _ = remoteIngressController.SetWatchErrorHandler(m.watchErrorHandler) go remoteIngressController.Run(stop) } } func (m *KIngressConfig) HasSynced() bool { IngressLog.Info("In Kingress Synced.") m.mutex.RLock() defer m.mutex.RUnlock() for _, remoteIngressController := range m.remoteIngressControllers { IngressLog.Info("In Kingress Synced.") if !remoteIngressController.HasSynced() { return false } } IngressLog.Info("KIngress config controller synced.") return true } func (m *KIngressConfig) SetWatchErrorHandler(f func(r *cache.Reflector, err error)) error { m.watchErrorHandler = f return nil } func (m *KIngressConfig) GetIngressRoutes() istiomodel.IngressRouteCollection { m.mutex.RLock() defer m.mutex.RUnlock() return m.ingressRouteCache } func (m *KIngressConfig) GetIngressDomains() istiomodel.IngressDomainCollection { m.mutex.RLock() defer m.mutex.RUnlock() return m.ingressDomainCache } func (m *KIngressConfig) CheckIngress(clusterName string) istiomodel.CheckIngressResponse { return istiomodel.CheckIngressResponse{} } func (m *KIngressConfig) Services(clusterName string) ([]*v1.Service, error) { return nil, nil } func (m *KIngressConfig) IngressControllers() map[string]string { return nil } func (m *KIngressConfig) Schemas() collection.Schemas { return common.IngressIR } func (m *KIngressConfig) Get(config.GroupVersionKind, string, string) *config.Config { return nil } func (m *KIngressConfig) Create(config.Config) (revision string, err error) { return "", common.ErrUnsupportedOp } func (m *KIngressConfig) Update(config.Config) (newRevision string, err error) { return "", common.ErrUnsupportedOp } func (m *KIngressConfig) UpdateStatus(config.Config) (newRevision string, err error) { return "", common.ErrUnsupportedOp } func (m *KIngressConfig) Patch(config.Config, config.PatchFunc) (string, error) { return "", common.ErrUnsupportedOp } func (m *KIngressConfig) Delete(config.GroupVersionKind, string, string, *string) error { return common.ErrUnsupportedOp }