pkg/ingress/config/ingress_config.go (1,559 lines of code) (raw):

// Copyright (c) 2022 Alibaba Group Holding Ltd. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package config import ( "bytes" "encoding/json" "errors" "fmt" "sort" "strings" "sync" corev3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" wasm "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/wasm/v3" httppb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3" v3 "github.com/envoyproxy/go-control-plane/envoy/extensions/wasm/v3" "github.com/golang/protobuf/jsonpb" _struct "github.com/golang/protobuf/ptypes/struct" "github.com/golang/protobuf/ptypes/wrappers" "google.golang.org/protobuf/types/known/anypb" extensions "istio.io/api/extensions/v1alpha1" networking "istio.io/api/networking/v1alpha3" istiotype "istio.io/api/type/v1beta1" "istio.io/istio/pilot/pkg/features" istiomodel "istio.io/istio/pilot/pkg/model" "istio.io/istio/pilot/pkg/util/protoconv" "istio.io/istio/pkg/cluster" "istio.io/istio/pkg/config" "istio.io/istio/pkg/config/constants" "istio.io/istio/pkg/config/schema/collection" "istio.io/istio/pkg/config/schema/gvk" "istio.io/istio/pkg/config/schema/kind" "istio.io/istio/pkg/log" "istio.io/istio/pkg/util/sets" v1 "k8s.io/api/core/v1" listersv1 "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" higressext "github.com/alibaba/higress/api/extensions/v1alpha1" higressv1 "github.com/alibaba/higress/api/networking/v1" extlisterv1 "github.com/alibaba/higress/client/pkg/listers/extensions/v1alpha1" netlisterv1 "github.com/alibaba/higress/client/pkg/listers/networking/v1" "github.com/alibaba/higress/pkg/cert" higressconst "github.com/alibaba/higress/pkg/config/constants" "github.com/alibaba/higress/pkg/ingress/kube/annotations" "github.com/alibaba/higress/pkg/ingress/kube/common" "github.com/alibaba/higress/pkg/ingress/kube/configmap" "github.com/alibaba/higress/pkg/ingress/kube/gateway" "github.com/alibaba/higress/pkg/ingress/kube/http2rpc" "github.com/alibaba/higress/pkg/ingress/kube/ingress" "github.com/alibaba/higress/pkg/ingress/kube/ingressv1" "github.com/alibaba/higress/pkg/ingress/kube/mcpbridge" "github.com/alibaba/higress/pkg/ingress/kube/secret" "github.com/alibaba/higress/pkg/ingress/kube/util" "github.com/alibaba/higress/pkg/ingress/kube/wasmplugin" . "github.com/alibaba/higress/pkg/ingress/log" "github.com/alibaba/higress/pkg/kube" "github.com/alibaba/higress/registry/memory" "github.com/alibaba/higress/registry/reconcile" ) var ( _ istiomodel.ConfigStoreController = &IngressConfig{} _ istiomodel.IngressStore = &IngressConfig{} Http2RpcMethodMap = func() map[string]string { return map[string]string{ "GET": "ALL_GET", "POST": "ALL_POST", "PUT": "ALL_PUT", "DELETE": "ALL_DELETE", "PATCH": "ALL_PATCH", } } Http2RpcParamSourceMap = func() map[string]string { return map[string]string{ "QUERY": "ALL_QUERY_PARAMETER", "HEADER": "ALL_HEADER", "PATH": "ALL_PATH", "BODY": "ALL_BODY", } } ) const ( DefaultMcpbridgeName = "default" ) type IngressConfig struct { remoteIngressControllers map[cluster.ID]common.IngressController remoteGatewayControllers map[cluster.ID]common.GatewayController mutex sync.RWMutex ingressRouteCache istiomodel.IngressRouteCollection ingressDomainCache istiomodel.IngressDomainCollection localKubeClient kube.Client virtualServiceHandlers []istiomodel.EventHandler gatewayHandlers []istiomodel.EventHandler destinationRuleHandlers []istiomodel.EventHandler envoyFilterHandlers []istiomodel.EventHandler serviceEntryHandlers []istiomodel.EventHandler wasmPluginHandlers []istiomodel.EventHandler watchErrorHandler cache.WatchErrorHandler cachedEnvoyFilters []config.Config watchedSecretSet sets.Set[string] RegistryReconciler *reconcile.Reconciler mcpbridgeController mcpbridge.McpBridgeController mcpbridgeLister netlisterv1.McpBridgeLister wasmPluginController wasmplugin.WasmPluginController wasmPluginLister extlisterv1.WasmPluginLister wasmPlugins map[string]*extensions.WasmPlugin http2rpcController http2rpc.Http2RpcController http2rpcLister netlisterv1.Http2RpcLister http2rpcs map[string]*higressv1.Http2Rpc configmapMgr *configmap.ConfigmapMgr XDSUpdater istiomodel.XDSUpdater annotationHandler annotations.AnnotationHandler globalGatewayName string namespace string clusterId cluster.ID httpsConfigMgr *cert.ConfigMgr commonOptions common.Options // templateProcessor processes template variables in config templateProcessor *TemplateProcessor // secretConfigMgr manages secret dependencies secretConfigMgr *SecretConfigMgr } // getSecretValue implements the getValue function for secret references func (m *IngressConfig) getSecretValue(valueType, namespace, name, key string) (string, error) { if valueType != "secret" { return "", fmt.Errorf("unsupported value type: %s", valueType) } m.mutex.RLock() defer m.mutex.RUnlock() for _, controller := range m.remoteIngressControllers { secret, err := controller.SecretLister().Secrets(namespace).Get(name) if err == nil { if value, exists := secret.Data[key]; exists { return string(value), nil } return "", fmt.Errorf("key %s not found in secret %s/%s", key, namespace, name) } } return "", fmt.Errorf("secret %s/%s not found", namespace, name) } func NewIngressConfig(localKubeClient kube.Client, xdsUpdater istiomodel.XDSUpdater, namespace string, options common.Options) *IngressConfig { clusterId := options.ClusterId if clusterId == "Kubernetes" { clusterId = "" } config := &IngressConfig{ remoteIngressControllers: make(map[cluster.ID]common.IngressController), remoteGatewayControllers: make(map[cluster.ID]common.GatewayController), localKubeClient: localKubeClient, XDSUpdater: xdsUpdater, annotationHandler: annotations.NewAnnotationHandlerManager(), clusterId: clusterId, globalGatewayName: namespace + "/" + common.CreateConvertedName(clusterId.String(), "global"), watchedSecretSet: sets.New[string](), namespace: namespace, wasmPlugins: make(map[string]*extensions.WasmPlugin), http2rpcs: make(map[string]*higressv1.Http2Rpc), commonOptions: options, } // Initialize secret config manager config.secretConfigMgr = NewSecretConfigMgr(xdsUpdater) // Initialize template processor with value getter function config.templateProcessor = NewTemplateProcessor(config.getSecretValue, namespace, config.secretConfigMgr) mcpbridgeController := mcpbridge.NewController(localKubeClient, options) mcpbridgeController.AddEventHandler(config.AddOrUpdateMcpBridge, config.DeleteMcpBridge) config.mcpbridgeController = mcpbridgeController config.mcpbridgeLister = mcpbridgeController.Lister() wasmPluginController := wasmplugin.NewController(localKubeClient, options) wasmPluginController.AddEventHandler(config.AddOrUpdateWasmPlugin, config.DeleteWasmPlugin) config.wasmPluginController = wasmPluginController config.wasmPluginLister = wasmPluginController.Lister() http2rpcController := http2rpc.NewController(localKubeClient, options) http2rpcController.AddEventHandler(config.AddOrUpdateHttp2Rpc, config.DeleteHttp2Rpc) config.http2rpcController = http2rpcController config.http2rpcLister = http2rpcController.Lister() higressConfigController := configmap.NewController(localKubeClient, clusterId, namespace) config.configmapMgr = configmap.NewConfigmapMgr(xdsUpdater, namespace, higressConfigController, higressConfigController.Lister()) httpsConfigMgr, _ := cert.NewConfigMgr(namespace, localKubeClient.Kube()) config.httpsConfigMgr = httpsConfigMgr return config } func (m *IngressConfig) RegisterEventHandler(kind config.GroupVersionKind, f istiomodel.EventHandler) { IngressLog.Infof("register resource %v", kind) switch kind { case gvk.VirtualService: m.virtualServiceHandlers = append(m.virtualServiceHandlers, f) case gvk.Gateway: m.gatewayHandlers = append(m.gatewayHandlers, f) case gvk.DestinationRule: m.destinationRuleHandlers = append(m.destinationRuleHandlers, f) case gvk.EnvoyFilter: m.envoyFilterHandlers = append(m.envoyFilterHandlers, f) case gvk.ServiceEntry: m.serviceEntryHandlers = append(m.serviceEntryHandlers, f) case gvk.WasmPlugin: m.wasmPluginHandlers = append(m.wasmPluginHandlers, f) } for _, remoteIngressController := range m.remoteIngressControllers { remoteIngressController.RegisterEventHandler(kind, f) } for _, remoteGatewayController := range m.remoteGatewayControllers { remoteGatewayController.RegisterEventHandler(kind, f) } } func (m *IngressConfig) AddLocalCluster(options common.Options) { secretController := secret.NewController(m.localKubeClient, options) secretController.AddEventHandler(m.ReflectSecretChanges) secretController.AddEventHandler(m.secretConfigMgr.HandleSecretChange) var ingressController common.IngressController v1 := common.V1Available(m.localKubeClient) if !v1 { ingressController = ingress.NewController(m.localKubeClient, m.localKubeClient, options, secretController) } else { ingressController = ingressv1.NewController(m.localKubeClient, m.localKubeClient, options, secretController) } m.remoteIngressControllers[options.ClusterId] = ingressController if features.EnableGatewayAPI { m.remoteGatewayControllers[options.ClusterId] = gateway.NewController(m.localKubeClient, options) } } func (m *IngressConfig) List(typ config.GroupVersionKind, namespace string) []config.Config { if typ != gvk.Gateway && typ != gvk.VirtualService && typ != gvk.DestinationRule && typ != gvk.EnvoyFilter && typ != gvk.ServiceEntry && typ != gvk.WasmPlugin { return nil } var configs = make([]config.Config, 0) if configsFromIngress := m.listFromIngressControllers(typ, namespace); configsFromIngress != nil { // Process templates for ingress configs for i := range configsFromIngress { if err := m.templateProcessor.ProcessConfig(&configsFromIngress[i]); err != nil { IngressLog.Errorf("Failed to process template for config %s/%s: %v", configsFromIngress[i].Namespace, configsFromIngress[i].Name, err) } } configs = append(configs, configsFromIngress...) } if configsFromGateway := m.listFromGatewayControllers(typ, namespace); configsFromGateway != nil { // Process templates for gateway configs for i := range configsFromGateway { if err := m.templateProcessor.ProcessConfig(&configsFromGateway[i]); err != nil { IngressLog.Errorf("Failed to process template for config %s/%s: %v", configsFromGateway[i].Namespace, configsFromGateway[i].Name, err) } } configs = append(configs, configsFromGateway...) } return configs } func (m *IngressConfig) listFromIngressControllers(typ config.GroupVersionKind, namespace string) []config.Config { // Currently, only support list all namespaces gateways or virtualservices. if namespace != "" { IngressLog.Warnf("ingress store only support type %s of all namespace, request namespace: %s", typ, namespace) return nil } if typ == gvk.EnvoyFilter { m.mutex.RLock() defer m.mutex.RUnlock() var envoyFilters []config.Config // Build configmap envoy filters configmapEnvoyFilters, err := m.configmapMgr.ConstructEnvoyFilters() if err != nil { IngressLog.Errorf("Construct configmap EnvoyFilters error %v", err) } else { for _, envoyFilter := range configmapEnvoyFilters { envoyFilters = append(envoyFilters, *envoyFilter) } IngressLog.Infof("Append %d configmap EnvoyFilters", len(configmapEnvoyFilters)) } if len(envoyFilters) == 0 { IngressLog.Infof("resource type %s, configs number %d", typ, len(m.cachedEnvoyFilters)) return m.cachedEnvoyFilters } envoyFilters = append(envoyFilters, m.cachedEnvoyFilters...) IngressLog.Infof("resource type %s, configs number %d", typ, len(envoyFilters)) return envoyFilters } var configs []config.Config m.mutex.RLock() for _, ingressController := range m.remoteIngressControllers { configs = append(configs, ingressController.List()...) } m.mutex.RUnlock() common.SortIngressByCreationTime(configs) wrapperConfigs := m.createWrapperConfigs(configs) var result []config.Config switch typ { case gvk.Gateway: result = m.convertGateways(wrapperConfigs) case gvk.VirtualService: result = m.convertVirtualService(wrapperConfigs) case gvk.DestinationRule: result = m.convertDestinationRule(wrapperConfigs) case gvk.ServiceEntry: result = m.convertServiceEntry(wrapperConfigs) case gvk.WasmPlugin: result = m.convertWasmPlugin(wrapperConfigs) } IngressLog.Infof("resource type %s, ingress number %d, convert configs number %d", typ, len(configs), len(result)) return result } func (m *IngressConfig) listFromGatewayControllers(typ config.GroupVersionKind, namespace string) []config.Config { var configs []config.Config for _, gatewayController := range m.remoteGatewayControllers { if clusterConfigs := gatewayController.List(typ, namespace); clusterConfigs != nil { configs = append(configs, clusterConfigs...) } } return configs } func (m *IngressConfig) createWrapperConfigs(configs []config.Config) []common.WrapperConfig { var wrapperConfigs []common.WrapperConfig // Init global context clusterSecretListers := map[cluster.ID]listersv1.SecretLister{} clusterServiceListers := map[cluster.ID]listersv1.ServiceLister{} m.mutex.RLock() for clusterId, controller := range m.remoteIngressControllers { clusterSecretListers[clusterId] = controller.SecretLister() clusterServiceListers[clusterId] = controller.ServiceLister() } m.mutex.RUnlock() globalContext := &annotations.GlobalContext{ WatchedSecrets: sets.New[string](), ClusterSecretLister: clusterSecretListers, ClusterServiceList: clusterServiceListers, } for idx := range configs { rawConfig := configs[idx] annotationsConfig := &annotations.Ingress{ Meta: annotations.Meta{ Namespace: rawConfig.Namespace, Name: rawConfig.Name, RawClusterId: common.GetRawClusterId(rawConfig.Annotations), ClusterId: common.GetClusterId(rawConfig.Annotations), }, } _ = m.annotationHandler.Parse(rawConfig.Annotations, annotationsConfig, globalContext) wrapperConfigs = append(wrapperConfigs, common.WrapperConfig{ Config: &rawConfig, AnnotationsConfig: annotationsConfig, }) } m.mutex.Lock() m.watchedSecretSet = globalContext.WatchedSecrets m.mutex.Unlock() return wrapperConfigs } func (m *IngressConfig) convertGateways(configs []common.WrapperConfig) []config.Config { convertOptions := common.ConvertOptions{ IngressDomainCache: common.NewIngressDomainCache(), Gateways: map[string]*common.WrapperGateway{}, } httpsCredentialConfig, err := m.httpsConfigMgr.GetConfigFromConfigmap() if err != nil { IngressLog.Errorf("Get higress https configmap err %v", err) } for idx := range configs { cfg := configs[idx] clusterId := common.GetClusterId(cfg.Config.Annotations) m.mutex.RLock() ingressController := m.remoteIngressControllers[clusterId] m.mutex.RUnlock() if ingressController == nil { continue } if err := ingressController.ConvertGateway(&convertOptions, &cfg, httpsCredentialConfig); err != nil { IngressLog.Errorf("Convert ingress %s/%s to gateway fail in cluster %s, err %v", cfg.Config.Namespace, cfg.Config.Name, clusterId, err) } } // apply annotation for _, wrapperGateway := range convertOptions.Gateways { m.annotationHandler.ApplyGateway(wrapperGateway.Gateway, wrapperGateway.WrapperConfig.AnnotationsConfig) } m.mutex.Lock() m.ingressDomainCache = convertOptions.IngressDomainCache.Extract() m.mutex.Unlock() out := make([]config.Config, 0, len(convertOptions.Gateways)) for _, gateway := range convertOptions.Gateways { cleanHost := common.CleanHost(gateway.Host) out = append(out, config.Config{ Meta: config.Meta{ GroupVersionKind: gvk.Gateway, Name: common.CreateConvertedName(constants.IstioIngressGatewayName, cleanHost), Namespace: m.namespace, Annotations: map[string]string{ common.ClusterIdAnnotation: gateway.ClusterId.String(), common.HostAnnotation: gateway.Host, }, }, Spec: gateway.Gateway, }) } return out } func (m *IngressConfig) convertVirtualService(configs []common.WrapperConfig) []config.Config { convertOptions := common.ConvertOptions{ IngressRouteCache: common.NewIngressRouteCache(), VirtualServices: map[string]*common.WrapperVirtualService{}, HTTPRoutes: map[string][]*common.WrapperHTTPRoute{}, Route2Ingress: map[string]*common.WrapperConfigWithRuleKey{}, } // convert http route for idx := range configs { cfg := configs[idx] clusterId := common.GetClusterId(cfg.Config.Annotations) m.mutex.RLock() ingressController := m.remoteIngressControllers[clusterId] m.mutex.RUnlock() if ingressController == nil { continue } if err := ingressController.ConvertHTTPRoute(&convertOptions, &cfg); err != nil { IngressLog.Errorf("Convert ingress %s/%s to HTTP route fail in cluster %s, err %v", cfg.Config.Namespace, cfg.Config.Name, clusterId, err) } } // Apply annotation on routes for _, routes := range convertOptions.HTTPRoutes { for _, route := range routes { m.annotationHandler.ApplyRoute(route.HTTPRoute, route.WrapperConfig.AnnotationsConfig) } } // Apply canary ingress if len(configs) > len(convertOptions.CanaryIngresses) { m.applyCanaryIngresses(&convertOptions) } // Normalize weighted cluster to make sure the sum of weight is 100. for _, host := range convertOptions.HTTPRoutes { for _, route := range host { normalizeWeightedCluster(convertOptions.IngressRouteCache, route) } } // Apply spec default backend. if convertOptions.HasDefaultBackend { for idx := range configs { cfg := configs[idx] clusterId := common.GetClusterId(cfg.Config.Annotations) m.mutex.RLock() ingressController := m.remoteIngressControllers[clusterId] m.mutex.RUnlock() if ingressController == nil { continue } if err := ingressController.ApplyDefaultBackend(&convertOptions, &cfg); err != nil { IngressLog.Errorf("Apply default backend on ingress %s/%s fail in cluster %s, err %v", cfg.Config.Namespace, cfg.Config.Name, clusterId, err) } } } // Apply annotation on virtual services for _, virtualService := range convertOptions.VirtualServices { m.annotationHandler.ApplyVirtualServiceHandler(virtualService.VirtualService, virtualService.WrapperConfig.AnnotationsConfig) } // Apply app root for per host. m.applyAppRoot(&convertOptions) // Apply internal active redirect for error page. m.applyInternalActiveRedirect(&convertOptions) m.mutex.Lock() m.ingressRouteCache = convertOptions.IngressRouteCache.Extract() m.mutex.Unlock() // Convert http route to virtual service out := make([]config.Config, 0, len(convertOptions.HTTPRoutes)) for host, routes := range convertOptions.HTTPRoutes { if len(routes) == 0 { continue } cleanHost := common.CleanHost(host) // namespace/name, name format: (istio cluster id)-host gateways := []string{m.namespace + "/" + common.CreateConvertedName(m.clusterId.String(), cleanHost), common.CreateConvertedName(constants.IstioIngressGatewayName, cleanHost)} wrapperVS, exist := convertOptions.VirtualServices[host] if !exist { IngressLog.Warnf("virtual service for host %s does not exist.", host) } vs := wrapperVS.VirtualService vs.Gateways = gateways // Sort, exact -> prefix -> regex common.SortHTTPRoutes(routes) for _, route := range routes { vs.Http = append(vs.Http, route.HTTPRoute) } firstRoute := routes[0] out = append(out, config.Config{ Meta: config.Meta{ GroupVersionKind: gvk.VirtualService, Name: common.CreateConvertedName(constants.IstioIngressGatewayName, firstRoute.WrapperConfig.Config.Namespace, firstRoute.WrapperConfig.Config.Name, cleanHost), Namespace: m.namespace, Annotations: map[string]string{ common.ClusterIdAnnotation: firstRoute.ClusterId.String(), }, }, Spec: vs, }) } // add vs from naco3 for mcp server if m.RegistryReconciler != nil { allConfigsFromMcp := m.RegistryReconciler.GetAllConfigs(gvk.VirtualService) for _, cfg := range allConfigsFromMcp { out = append(out, *cfg) } } // We generate some specific envoy filter here to avoid duplicated computation. m.convertEnvoyFilter(&convertOptions) return out } func (m *IngressConfig) convertEnvoyFilter(convertOptions *common.ConvertOptions) { var envoyFilters []config.Config mappings := map[string]*common.Rule{} initHttp2RpcGlobalConfig := true for _, routes := range convertOptions.HTTPRoutes { for _, route := range routes { if strings.HasSuffix(route.HTTPRoute.Name, "app-root") { continue } http2rpc := route.WrapperConfig.AnnotationsConfig.Http2Rpc if http2rpc != nil { IngressLog.Infof("Found http2rpc for name %s", http2rpc.Name) envoyFilter, err := m.constructHttp2RpcEnvoyFilter(http2rpc, route, m.namespace, initHttp2RpcGlobalConfig) if err != nil { IngressLog.Infof("Construct http2rpc EnvoyFilter error %v", err) } else { IngressLog.Infof("Append http2rpc EnvoyFilter for name %s", http2rpc.Name) envoyFilters = append(envoyFilters, *envoyFilter) initHttp2RpcGlobalConfig = false } } auth := route.WrapperConfig.AnnotationsConfig.Auth if auth == nil { continue } key := auth.AuthSecret.String() + "/" + auth.AuthRealm if rule, exist := mappings[key]; !exist { mappings[key] = &common.Rule{ Realm: auth.AuthRealm, MatchRoute: []string{route.HTTPRoute.Name}, Credentials: auth.Credentials, Encrypted: true, } } else { rule.MatchRoute = append(rule.MatchRoute, route.HTTPRoute.Name) } } } IngressLog.Infof("Found %d number of basic auth", len(mappings)) if len(mappings) > 0 { rules := &common.BasicAuthRules{} for _, rule := range mappings { rules.Rules = append(rules.Rules, rule) } basicAuth, err := constructBasicAuthEnvoyFilter(rules, m.namespace) if err != nil { IngressLog.Errorf("Construct basic auth filter error %v", err) } else { envoyFilters = append(envoyFilters, *basicAuth) } } // TODO Support other envoy filters IngressLog.Infof("Found %d number of envoyFilters", len(envoyFilters)) m.mutex.Lock() m.cachedEnvoyFilters = envoyFilters m.mutex.Unlock() } func (m *IngressConfig) convertWasmPlugin([]common.WrapperConfig) []config.Config { m.mutex.RLock() defer m.mutex.RUnlock() out := make([]config.Config, 0, len(m.wasmPlugins)) for name, wasmPlugin := range m.wasmPlugins { out = append(out, config.Config{ Meta: config.Meta{ GroupVersionKind: gvk.WasmPlugin, Name: name, Namespace: m.namespace, }, Spec: wasmPlugin, }) } // add wasm plugin from nacos for mcp server if m.RegistryReconciler != nil { wasmFromMcp := m.RegistryReconciler.GetAllConfigs(gvk.WasmPlugin) for _, cfg := range wasmFromMcp { out = append(out, *cfg) } } return out } func (m *IngressConfig) convertServiceEntry([]common.WrapperConfig) []config.Config { if m.RegistryReconciler == nil { return nil } serviceEntries := m.RegistryReconciler.GetAllServiceWrapper() IngressLog.Infof("Found mcp serviceEntries %v", serviceEntries) out := make([]config.Config, 0, len(serviceEntries)) hostSets := sets.Set[string]{} for _, se := range serviceEntries { out = append(out, config.Config{ Meta: config.Meta{ GroupVersionKind: gvk.ServiceEntry, Name: se.ServiceEntry.Hosts[0], Namespace: "mcp", CreationTimestamp: se.GetCreateTime(), Labels: map[string]string{ higressconst.RegistryTypeLabelKey: se.RegistryType, higressconst.RegistryNameLabelKey: se.RegistryName, }, }, Spec: se.ServiceEntry, }) hostSets.Insert(se.ServiceEntry.Hosts[0]) } // add service entry by host from nacos3 for mcp server seFromMcp := m.RegistryReconciler.GetAllConfigs(gvk.ServiceEntry) for _, cfg := range seFromMcp { se := cfg.Spec.(*networking.ServiceEntry) if !hostSets.Contains(se.Hosts[0]) { out = append(out, *cfg) } } return out } func (m *IngressConfig) convertDestinationRule(configs []common.WrapperConfig) []config.Config { convertOptions := common.ConvertOptions{ Service2TrafficPolicy: map[common.ServiceKey]*common.WrapperTrafficPolicy{}, } // Convert destination from service within ingress rule. for idx := range configs { cfg := configs[idx] clusterId := common.GetClusterId(cfg.Config.Annotations) m.mutex.RLock() ingressController := m.remoteIngressControllers[clusterId] m.mutex.RUnlock() if ingressController == nil { continue } if err := ingressController.ConvertTrafficPolicy(&convertOptions, &cfg); err != nil { IngressLog.Errorf("Convert ingress %s/%s to destination rule fail in cluster %s, err %v", cfg.Config.Namespace, cfg.Config.Name, clusterId, err) } } IngressLog.Debugf("traffic policy number %d", len(convertOptions.Service2TrafficPolicy)) for _, wrapperTrafficPolicy := range convertOptions.Service2TrafficPolicy { m.annotationHandler.ApplyTrafficPolicy(wrapperTrafficPolicy.TrafficPolicy, wrapperTrafficPolicy.PortTrafficPolicy, wrapperTrafficPolicy.WrapperConfig.AnnotationsConfig) } // Merge multi-port traffic policy per service into one destination rule. destinationRules := map[string]*common.WrapperDestinationRule{} for key, wrapperTrafficPolicy := range convertOptions.Service2TrafficPolicy { var serviceName string if key.ServiceFQDN != "" { serviceName = key.ServiceFQDN } else { serviceName = util.CreateServiceFQDN(key.Namespace, key.Name) } dr, exist := destinationRules[serviceName] if !exist { trafficPolicy := &networking.TrafficPolicy{} if wrapperTrafficPolicy.PortTrafficPolicy != nil { trafficPolicy.PortLevelSettings = []*networking.TrafficPolicy_PortTrafficPolicy{wrapperTrafficPolicy.PortTrafficPolicy} } else if wrapperTrafficPolicy.TrafficPolicy != nil { trafficPolicy = wrapperTrafficPolicy.TrafficPolicy } dr = &common.WrapperDestinationRule{ DestinationRule: &networking.DestinationRule{ Host: serviceName, TrafficPolicy: trafficPolicy, }, WrapperConfig: wrapperTrafficPolicy.WrapperConfig, ServiceKey: key, } } else if wrapperTrafficPolicy.PortTrafficPolicy != nil { dr.DestinationRule.TrafficPolicy.PortLevelSettings = append(dr.DestinationRule.TrafficPolicy.PortLevelSettings, wrapperTrafficPolicy.PortTrafficPolicy) } destinationRules[serviceName] = dr } if m.RegistryReconciler != nil { drws := m.RegistryReconciler.GetAllDestinationRuleWrapper() for _, destinationRuleWrapper := range drws { serviceName := destinationRuleWrapper.ServiceKey.ServiceFQDN dr, exist := destinationRules[serviceName] if !exist { destinationRules[serviceName] = destinationRuleWrapper } else if dr.DestinationRule.TrafficPolicy != nil { if dr.DestinationRule.TrafficPolicy.LoadBalancer == nil && destinationRuleWrapper.DestinationRule.TrafficPolicy.LoadBalancer != nil { dr.DestinationRule.TrafficPolicy.LoadBalancer = destinationRuleWrapper.DestinationRule.TrafficPolicy.LoadBalancer } portTrafficPolicy := destinationRuleWrapper.DestinationRule.TrafficPolicy.PortLevelSettings[0] portUpdated := false for _, policy := range dr.DestinationRule.TrafficPolicy.PortLevelSettings { if policy.Port.Number == portTrafficPolicy.Port.Number { policy.Tls = portTrafficPolicy.Tls portUpdated = true break } } if portUpdated { continue } dr.DestinationRule.TrafficPolicy.PortLevelSettings = append(dr.DestinationRule.TrafficPolicy.PortLevelSettings, portTrafficPolicy) } } } out := make([]config.Config, 0, len(destinationRules)) for _, dr := range destinationRules { sort.SliceStable(dr.DestinationRule.TrafficPolicy.PortLevelSettings, func(i, j int) bool { portI := dr.DestinationRule.TrafficPolicy.PortLevelSettings[i].Port portJ := dr.DestinationRule.TrafficPolicy.PortLevelSettings[j].Port if portI == nil && portJ == nil { return true } else if portI == nil { return true } else if portJ == nil { return false } return portI.Number < portJ.Number }) drName := util.CreateDestinationRuleName(m.clusterId, dr.ServiceKey.Namespace, dr.ServiceKey.Name) out = append(out, config.Config{ Meta: config.Meta{ GroupVersionKind: gvk.DestinationRule, Name: common.CreateConvertedName(constants.IstioIngressGatewayName, drName), Namespace: m.namespace, }, Spec: dr.DestinationRule, }) } return out } func (m *IngressConfig) applyAppRoot(convertOptions *common.ConvertOptions) { for host, wrapVS := range convertOptions.VirtualServices { if wrapVS.AppRoot != "" { route := &common.WrapperHTTPRoute{ HTTPRoute: &networking.HTTPRoute{ Name: common.CreateConvertedName(host, "app-root"), Match: []*networking.HTTPMatchRequest{ { Uri: &networking.StringMatch{ MatchType: &networking.StringMatch_Exact{ Exact: "/", }, }, }, }, Redirect: &networking.HTTPRedirect{ RedirectCode: 302, Uri: wrapVS.AppRoot, }, }, WrapperConfig: wrapVS.WrapperConfig, ClusterId: wrapVS.WrapperConfig.AnnotationsConfig.ClusterId, } convertOptions.HTTPRoutes[host] = append([]*common.WrapperHTTPRoute{route}, convertOptions.HTTPRoutes[host]...) } } } func (m *IngressConfig) applyInternalActiveRedirect(convertOptions *common.ConvertOptions) { for host, routes := range convertOptions.HTTPRoutes { var tempRoutes []*common.WrapperHTTPRoute for _, route := range routes { tempRoutes = append(tempRoutes, route) if route.HTTPRoute.InternalActiveRedirect != nil { fallbackConfig := route.WrapperConfig.AnnotationsConfig.Fallback if fallbackConfig == nil { continue } typedNamespace := fallbackConfig.DefaultBackend internalRedirectRoute := route.HTTPRoute.DeepCopy() internalRedirectRoute.Name = internalRedirectRoute.Name + annotations.FallbackRouteNameSuffix internalRedirectRoute.InternalActiveRedirect = nil internalRedirectRoute.Match = []*networking.HTTPMatchRequest{ { Uri: &networking.StringMatch{ MatchType: &networking.StringMatch_Exact{ Exact: "/", }, }, Headers: map[string]*networking.StringMatch{ annotations.FallbackInjectHeaderRouteName: { MatchType: &networking.StringMatch_Exact{ Exact: internalRedirectRoute.Name, }, }, annotations.FallbackInjectFallbackService: { MatchType: &networking.StringMatch_Exact{ Exact: typedNamespace.String(), }, }, }, }, } internalRedirectRoute.Route = []*networking.HTTPRouteDestination{ { Destination: &networking.Destination{ Host: util.CreateServiceFQDN(typedNamespace.Namespace, typedNamespace.Name), Port: &networking.PortSelector{ Number: fallbackConfig.Port, }, }, Weight: 100, }, } tempRoutes = append([]*common.WrapperHTTPRoute{{ HTTPRoute: internalRedirectRoute, WrapperConfig: route.WrapperConfig, ClusterId: route.ClusterId, }}, tempRoutes...) } } convertOptions.HTTPRoutes[host] = tempRoutes } } func (m *IngressConfig) convertIstioWasmPlugin(obj *higressext.WasmPlugin) (*extensions.WasmPlugin, error) { result := &extensions.WasmPlugin{ Selector: &istiotype.WorkloadSelector{ MatchLabels: map[string]string{ m.commonOptions.GatewaySelectorKey: m.commonOptions.GatewaySelectorValue, }, }, Url: obj.Url, Sha256: obj.Sha256, ImagePullPolicy: extensions.PullPolicy(obj.ImagePullPolicy), ImagePullSecret: obj.ImagePullSecret, VerificationKey: obj.VerificationKey, PluginConfig: obj.PluginConfig, PluginName: obj.PluginName, Phase: extensions.PluginPhase(obj.Phase), FailStrategy: extensions.FailStrategy(obj.FailStrategy), Priority: obj.Priority, } if obj.VmConfig != nil { result.VmConfig = &extensions.VmConfig{} for _, env := range obj.VmConfig.Env { result.VmConfig.Env = append(result.VmConfig.Env, &extensions.EnvVar{ Name: env.Name, ValueFrom: extensions.EnvValueSource(env.ValueFrom), Value: env.Value, }) } } if result.PluginConfig != nil { return result, nil } if !isBoolValueTrue(obj.DefaultConfigDisable) { result.PluginConfig = obj.DefaultConfig } hasValidRule := false if len(obj.MatchRules) > 0 { if result.PluginConfig == nil { result.PluginConfig = &_struct.Struct{ Fields: map[string]*_struct.Value{}, } } var ruleValues []*_struct.Value for _, rule := range obj.MatchRules { if isBoolValueTrue(rule.ConfigDisable) { continue } if rule.Config == nil { rule.Config = &_struct.Struct{ Fields: map[string]*_struct.Value{}, } } v := &_struct.Value_StructValue{ StructValue: rule.Config, } validRule := false var matchItems []*_struct.Value // match ingress for _, ing := range rule.Ingress { matchItems = append(matchItems, &_struct.Value{ Kind: &_struct.Value_StringValue{ StringValue: ing, }, }) } if len(matchItems) > 0 { validRule = true v.StructValue.Fields["_match_route_"] = &_struct.Value{ Kind: &_struct.Value_ListValue{ ListValue: &_struct.ListValue{ Values: matchItems, }, }, } } // match service matchItems = nil for _, service := range rule.Service { matchItems = append(matchItems, &_struct.Value{ Kind: &_struct.Value_StringValue{ StringValue: service, }, }) } if len(matchItems) > 0 { validRule = true v.StructValue.Fields["_match_service_"] = &_struct.Value{ Kind: &_struct.Value_ListValue{ ListValue: &_struct.ListValue{ Values: matchItems, }, }, } } // match domain matchItems = nil for _, domain := range rule.Domain { matchItems = append(matchItems, &_struct.Value{ Kind: &_struct.Value_StringValue{ StringValue: domain, }, }) } if len(matchItems) > 0 { validRule = true v.StructValue.Fields["_match_domain_"] = &_struct.Value{ Kind: &_struct.Value_ListValue{ ListValue: &_struct.ListValue{ Values: matchItems, }, }, } } if validRule { ruleValues = append(ruleValues, &_struct.Value{ Kind: v, }) } else { return nil, fmt.Errorf("invalid match rule has no match condition, rule:%v", rule) } } if len(ruleValues) > 0 { hasValidRule = true result.PluginConfig.Fields["_rules_"] = &_struct.Value{ Kind: &_struct.Value_ListValue{ ListValue: &_struct.ListValue{ Values: ruleValues, }, }, } } } if !hasValidRule && isBoolValueTrue(obj.DefaultConfigDisable) { return nil, nil } return result, nil } func isBoolValueTrue(b *wrappers.BoolValue) bool { return b != nil && b.Value } func (m *IngressConfig) AddOrUpdateWasmPlugin(clusterNamespacedName util.ClusterNamespacedName) { if clusterNamespacedName.Namespace != m.namespace { return } wasmPlugin, err := m.wasmPluginLister.WasmPlugins(clusterNamespacedName.Namespace).Get(clusterNamespacedName.Name) if err != nil { IngressLog.Errorf("wasmPlugin is not found, namespace:%s, name:%s", clusterNamespacedName.Namespace, clusterNamespacedName.Name) return } metadata := config.Meta{ Name: clusterNamespacedName.Name + "-wasmplugin", Namespace: clusterNamespacedName.Namespace, GroupVersionKind: gvk.WasmPlugin, // Set this label so that we do not compare configs and just push. Labels: map[string]string{constants.AlwaysPushLabel: "true"}, } for _, f := range m.wasmPluginHandlers { IngressLog.Debug("WasmPlugin triggerd update") f(config.Config{Meta: metadata}, config.Config{Meta: metadata}, istiomodel.EventUpdate) } istioWasmPlugin, err := m.convertIstioWasmPlugin(&wasmPlugin.Spec) if err != nil { IngressLog.Errorf("invalid wasmPlugin:%s, err:%v", clusterNamespacedName.Name, err) return } if istioWasmPlugin == nil { IngressLog.Infof("wasmPlugin:%s will not be transferred to istio since config disabled", clusterNamespacedName.Name) m.mutex.Lock() delete(m.wasmPlugins, clusterNamespacedName.Name) m.mutex.Unlock() return } IngressLog.Debugf("wasmPlugin:%s convert to istioWasmPlugin:%v", clusterNamespacedName.Name, istioWasmPlugin) m.mutex.Lock() m.wasmPlugins[clusterNamespacedName.Name] = istioWasmPlugin m.mutex.Unlock() } func (m *IngressConfig) DeleteWasmPlugin(clusterNamespacedName util.ClusterNamespacedName) { if clusterNamespacedName.Namespace != m.namespace { return } var hit bool m.mutex.Lock() if _, ok := m.wasmPlugins[clusterNamespacedName.Name]; ok { delete(m.wasmPlugins, clusterNamespacedName.Name) hit = true } m.mutex.Unlock() if hit { metadata := config.Meta{ Name: clusterNamespacedName.Name + "-wasmplugin", Namespace: clusterNamespacedName.Namespace, GroupVersionKind: gvk.WasmPlugin, // Set this label so that we do not compare configs and just push. Labels: map[string]string{constants.AlwaysPushLabel: "true"}, } for _, f := range m.wasmPluginHandlers { IngressLog.Debug("WasmPlugin triggerd update") f(config.Config{Meta: metadata}, config.Config{Meta: metadata}, istiomodel.EventDelete) } } } func (m *IngressConfig) AddOrUpdateMcpBridge(clusterNamespacedName util.ClusterNamespacedName) { // TODO: get resource name from config if clusterNamespacedName.Name != DefaultMcpbridgeName || clusterNamespacedName.Namespace != m.namespace { return } mcpbridge, err := m.mcpbridgeLister.McpBridges(clusterNamespacedName.Namespace).Get(clusterNamespacedName.Name) if err != nil { IngressLog.Errorf("Mcpbridge is not found, namespace:%s, name:%s", clusterNamespacedName.Namespace, clusterNamespacedName.Name) return } if m.RegistryReconciler == nil { m.RegistryReconciler = reconcile.NewReconciler(func() { seMetadata := config.Meta{ Name: "mcpbridge-serviceentry", Namespace: m.namespace, GroupVersionKind: gvk.ServiceEntry, // Set this label so that we do not compare configs and just push. Labels: map[string]string{constants.AlwaysPushLabel: "true"}, } drMetadata := config.Meta{ Name: "mcpbridge-destinationrule", Namespace: m.namespace, GroupVersionKind: gvk.DestinationRule, // Set this label so that we do not compare configs and just push. Labels: map[string]string{constants.AlwaysPushLabel: "true"}, } vsMetadata := config.Meta{ Name: "mcpbridge-virtualservice", Namespace: m.namespace, GroupVersionKind: gvk.VirtualService, // Set this label so that we do not compare configs and just push. Labels: map[string]string{constants.AlwaysPushLabel: "true"}, } wasmMetadata := config.Meta{ Name: "mcpbridge-wasmplugin", Namespace: m.namespace, GroupVersionKind: gvk.WasmPlugin, // Set this label so that we do not compare configs and just push. Labels: map[string]string{constants.AlwaysPushLabel: "true"}, } efMetadata := config.Meta{ Name: "mcpbridge-envoyfilter", Namespace: m.namespace, GroupVersionKind: gvk.EnvoyFilter, // Set this label so that we do not compare configs and just push. Labels: map[string]string{constants.AlwaysPushLabel: "true"}, } for _, f := range m.serviceEntryHandlers { IngressLog.Debug("McpBridge triggerd serviceEntry update") f(config.Config{Meta: seMetadata}, config.Config{Meta: seMetadata}, istiomodel.EventUpdate) } for _, f := range m.destinationRuleHandlers { IngressLog.Debug("McpBridge triggerd destinationRule update") f(config.Config{Meta: drMetadata}, config.Config{Meta: drMetadata}, istiomodel.EventUpdate) } for _, f := range m.virtualServiceHandlers { IngressLog.Debug("McpBridge triggerd virtualservice update") f(config.Config{Meta: vsMetadata}, config.Config{Meta: vsMetadata}, istiomodel.EventUpdate) } for _, f := range m.wasmPluginHandlers { IngressLog.Debug("McpBridge triggerd wasmplugin update") f(config.Config{Meta: wasmMetadata}, config.Config{Meta: wasmMetadata}, istiomodel.EventUpdate) } for _, f := range m.envoyFilterHandlers { IngressLog.Debug("McpBridge triggerd envoyfilter update") f(config.Config{Meta: efMetadata}, config.Config{Meta: efMetadata}, istiomodel.EventUpdate) } }, m.localKubeClient, m.namespace, m.clusterId.String()) } reconciler := m.RegistryReconciler m.configmapMgr.SetMcpReconciler(m.RegistryReconciler) err = reconciler.Reconcile(mcpbridge) if err != nil { IngressLog.Errorf("Mcpbridge reconcile failed, err:%v", err) return } IngressLog.Info("Mcpbridge reconciled") } func (m *IngressConfig) DeleteMcpBridge(clusterNamespacedName util.ClusterNamespacedName) { // TODO: get resource name from config if clusterNamespacedName.Name != "default" || clusterNamespacedName.Namespace != m.namespace { return } if m.RegistryReconciler != nil { go m.RegistryReconciler.Reconcile(nil) m.RegistryReconciler = nil } } func (m *IngressConfig) AddOrUpdateHttp2Rpc(clusterNamespacedName util.ClusterNamespacedName) { if clusterNamespacedName.Namespace != m.namespace { return } http2rpc, err := m.http2rpcLister.Http2Rpcs(clusterNamespacedName.Namespace).Get(clusterNamespacedName.Name) if err != nil { IngressLog.Errorf("http2rpc is not found, namespace:%s, name:%s", clusterNamespacedName.Namespace, clusterNamespacedName.Name) return } m.mutex.Lock() m.http2rpcs[clusterNamespacedName.Name] = &http2rpc.Spec m.mutex.Unlock() IngressLog.Infof("AddOrUpdateHttp2Rpc http2rpc ingress name %s", clusterNamespacedName.Name) push := func(gvk config.GroupVersionKind) { m.XDSUpdater.ConfigUpdate(&istiomodel.PushRequest{ Full: true, ConfigsUpdated: map[istiomodel.ConfigKey]struct{}{{ Kind: kind.MustFromGVK(gvk), Name: clusterNamespacedName.Name, Namespace: clusterNamespacedName.Namespace, }: {}}, Reason: istiomodel.NewReasonStats("Http2Rpc-AddOrUpdate"), }) } push(gvk.VirtualService) push(gvk.EnvoyFilter) } func (m *IngressConfig) DeleteHttp2Rpc(clusterNamespacedName util.ClusterNamespacedName) { IngressLog.Infof("Http2Rpc triggered deleted event %s", clusterNamespacedName.Name) if clusterNamespacedName.Namespace != m.namespace { return } var hit bool m.mutex.Lock() if _, ok := m.http2rpcs[clusterNamespacedName.Name]; ok { delete(m.http2rpcs, clusterNamespacedName.Name) hit = true } m.mutex.Unlock() if hit { IngressLog.Infof("Http2Rpc triggerd deleted event executed %s", clusterNamespacedName.Name) push := func(gvk config.GroupVersionKind) { m.XDSUpdater.ConfigUpdate(&istiomodel.PushRequest{ Full: true, ConfigsUpdated: map[istiomodel.ConfigKey]struct{}{{ Kind: kind.MustFromGVK(gvk), Name: clusterNamespacedName.Name, Namespace: clusterNamespacedName.Namespace, }: {}}, Reason: istiomodel.NewReasonStats("Http2Rpc-Deleted"), }) } push(gvk.VirtualService) push(gvk.EnvoyFilter) } } func (m *IngressConfig) ReflectSecretChanges(clusterNamespacedName util.ClusterNamespacedName) { var hit bool m.mutex.RLock() if m.watchedSecretSet.Contains(clusterNamespacedName.String()) { hit = true } m.mutex.RUnlock() if hit { push := func(gvk config.GroupVersionKind) { m.XDSUpdater.ConfigUpdate(&istiomodel.PushRequest{ Full: true, ConfigsUpdated: map[istiomodel.ConfigKey]struct{}{{ Kind: kind.MustFromGVK(gvk), Name: clusterNamespacedName.Name, Namespace: clusterNamespacedName.Namespace, }: {}}, Reason: istiomodel.NewReasonStats("auth-secret-change"), }) } push(gvk.VirtualService) push(gvk.EnvoyFilter) } } func normalizeWeightedCluster(cache *common.IngressRouteCache, route *common.WrapperHTTPRoute) { if len(route.HTTPRoute.Route) == 1 { route.HTTPRoute.Route[0].Weight = 100 return } var weightTotal int32 = 0 for idx, routeDestination := range route.HTTPRoute.Route { if idx == 0 { continue } weightTotal += routeDestination.Weight } if weightTotal < route.WeightTotal { weightTotal = route.WeightTotal } var sum int32 for idx, routeDestination := range route.HTTPRoute.Route { if idx == 0 { continue } weight := float32(routeDestination.Weight) / float32(weightTotal) routeDestination.Weight = int32(weight * 100) sum += routeDestination.Weight } route.HTTPRoute.Route[0].Weight = 100 - sum // Update the recorded status in ingress builder if cache != nil { cache.Update(route) } } func (m *IngressConfig) applyCanaryIngresses(convertOptions *common.ConvertOptions) { if len(convertOptions.CanaryIngresses) == 0 { return } IngressLog.Infof("Found %d number of canary ingresses.", len(convertOptions.CanaryIngresses)) for _, cfg := range convertOptions.CanaryIngresses { clusterId := common.GetClusterId(cfg.Config.Annotations) m.mutex.RLock() ingressController := m.remoteIngressControllers[clusterId] m.mutex.RUnlock() if ingressController == nil { continue } if err := ingressController.ApplyCanaryIngress(convertOptions, cfg); err != nil { IngressLog.Errorf("Apply canary ingress %s/%s fail in cluster %s, err %v", cfg.Config.Namespace, cfg.Config.Name, clusterId, err) } } } func (m *IngressConfig) constructHttp2RpcEnvoyFilter(http2rpcConfig *annotations.Http2RpcConfig, route *common.WrapperHTTPRoute, namespace string, initHttp2RpcGlobalConfig bool) (*config.Config, error) { mappings := m.http2rpcs IngressLog.Infof("Found http2rpc mappings %v", mappings) if _, exist := mappings[http2rpcConfig.Name]; !exist { IngressLog.Errorf("Http2RpcConfig name %s, not found Http2Rpc CRD", http2rpcConfig.Name) return nil, errors.New("invalid http2rpcConfig has no usable http2rpc") } http2rpcCRD := mappings[http2rpcConfig.Name] if http2rpcCRD.GetDubbo() == nil { IngressLog.Errorf("Http2RpcConfig name %s, only support Http2Rpc CRD Dubbo Service type", http2rpcConfig.Name) return nil, errors.New("invalid http2rpcConfig has no usable http2rpc") } httpRoute := route.HTTPRoute httpRouteDestination := httpRoute.Route[0] typeStruct, err := m.constructHttp2RpcMethods(http2rpcCRD.GetDubbo()) if err != nil { return nil, errors.New(err.Error()) } configPatches := []*networking.EnvoyFilter_EnvoyConfigObjectPatch{ { ApplyTo: networking.EnvoyFilter_HTTP_ROUTE, Match: &networking.EnvoyFilter_EnvoyConfigObjectMatch{ Context: networking.EnvoyFilter_GATEWAY, ObjectTypes: &networking.EnvoyFilter_EnvoyConfigObjectMatch_RouteConfiguration{ RouteConfiguration: &networking.EnvoyFilter_RouteConfigurationMatch{ Vhost: &networking.EnvoyFilter_RouteConfigurationMatch_VirtualHostMatch{ Route: &networking.EnvoyFilter_RouteConfigurationMatch_RouteMatch{ Name: httpRoute.Name, }, }, }, }, }, Patch: &networking.EnvoyFilter_Patch{ Operation: networking.EnvoyFilter_Patch_MERGE, Value: typeStruct, }, }, { ApplyTo: networking.EnvoyFilter_CLUSTER, Match: &networking.EnvoyFilter_EnvoyConfigObjectMatch{ Context: networking.EnvoyFilter_GATEWAY, ObjectTypes: &networking.EnvoyFilter_EnvoyConfigObjectMatch_Cluster{ Cluster: &networking.EnvoyFilter_ClusterMatch{ Service: httpRouteDestination.Destination.Host, }, }, }, Patch: &networking.EnvoyFilter_Patch{ Operation: networking.EnvoyFilter_Patch_MERGE, Value: buildPatchStruct(`{ "upstream_config": { "name":"envoy.upstreams.http.dubbo_tcp", "typed_config":{ "@type":"type.googleapis.com/udpa.type.v1.TypedStruct", "type_url":"type.googleapis.com/envoy.extensions.upstreams.http.dubbo_tcp.v3.DubboTcpConnectionPoolProto" } } }`), }, }, } if initHttp2RpcGlobalConfig { configPatches = append(configPatches, &networking.EnvoyFilter_EnvoyConfigObjectPatch{ ApplyTo: networking.EnvoyFilter_HTTP_FILTER, Match: &networking.EnvoyFilter_EnvoyConfigObjectMatch{ Context: networking.EnvoyFilter_GATEWAY, ObjectTypes: &networking.EnvoyFilter_EnvoyConfigObjectMatch_Listener{ Listener: &networking.EnvoyFilter_ListenerMatch{ FilterChain: &networking.EnvoyFilter_ListenerMatch_FilterChainMatch{ Filter: &networking.EnvoyFilter_ListenerMatch_FilterMatch{ Name: "envoy.filters.network.http_connection_manager", SubFilter: &networking.EnvoyFilter_ListenerMatch_SubFilterMatch{ Name: "envoy.filters.http.router", }, }, }, }, }, }, Patch: &networking.EnvoyFilter_Patch{ Operation: networking.EnvoyFilter_Patch_INSERT_BEFORE, Value: buildPatchStruct(`{ "name":"envoy.filters.http.http_dubbo_transcoder", "typed_config":{ "@type":"type.googleapis.com/udpa.type.v1.TypedStruct", "type_url":"type.googleapis.com/envoy.extensions.filters.http.http_dubbo_transcoder.v3.HttpDubboTranscoder" } }`), }, }) } return &config.Config{ Meta: config.Meta{ GroupVersionKind: gvk.EnvoyFilter, Name: common.CreateConvertedName(constants.IstioIngressGatewayName, http2rpcConfig.Name), Namespace: namespace, }, Spec: &networking.EnvoyFilter{ ConfigPatches: configPatches, }, }, nil } func (m *IngressConfig) constructHttp2RpcMethods(dubbo *higressv1.DubboService) (*_struct.Struct, error) { httpRouterTemplate := `{ "route": { "upgrade_configs": [ { "connect_config": { "allow_post": true }, "upgrade_type": "CONNECT" } ] }, "typed_per_filter_config": { "envoy.filters.http.http_dubbo_transcoder": { "@type": "type.googleapis.com/udpa.type.v1.TypedStruct", "type_url": "type.googleapis.com/envoy.extensions.filters.http.http_dubbo_transcoder.v3.HttpDubboTranscoder", "value": { "request_validation_options": { "reject_unknown_method": true, "reject_unknown_query_parameters": true }, "services_mapping": %s, "url_unescape_spec": "ALL_CHARACTERS_EXCEPT_RESERVED" } } } }` var methods []interface{} for _, serviceMethod := range dubbo.GetMethods() { var method = make(map[string]interface{}) method["name"] = serviceMethod.GetServiceMethod() var params []interface{} // paramFromEntireBody is for methods with single parameter. So when paramFromEntireBody exists, we just ignore params. var paramFromEntireBody = serviceMethod.GetParamFromEntireBody() if paramFromEntireBody != nil { var param = make(map[string]interface{}) param["extract_key_spec"] = Http2RpcParamSourceMap()["BODY"] param["mapping_type"] = paramFromEntireBody.GetParamType() params = append(params, param) } else { for _, methodParam := range serviceMethod.GetParams() { var param = make(map[string]interface{}) param["extract_key"] = methodParam.GetParamKey() param["extract_key_spec"] = Http2RpcParamSourceMap()[methodParam.GetParamSource()] param["mapping_type"] = methodParam.GetParamType() params = append(params, param) } } method["parameter_mapping"] = params var path_matcher = make(map[string]interface{}) path_matcher["match_http_method_spec"] = Http2RpcMethodMap()[serviceMethod.HttpMethods[0]] path_matcher["match_pattern"] = serviceMethod.GetHttpPath() method["path_matcher"] = path_matcher var passthrough_setting = make(map[string]interface{}) var headersAttach = serviceMethod.GetHeadersAttach() if headersAttach == "" { passthrough_setting["passthrough_all_headers"] = false } else if headersAttach == "*" { passthrough_setting["passthrough_all_headers"] = true } else { passthrough_setting["passthrough_headers"] = headersAttach } method["passthrough_setting"] = passthrough_setting methods = append(methods, method) } var serviceMapping = make(map[string]interface{}) var dubboServiceGroup = dubbo.GetGroup() if dubboServiceGroup != "" { serviceMapping["group"] = dubboServiceGroup } serviceMapping["name"] = dubbo.GetService() serviceMapping["version"] = dubbo.GetVersion() serviceMapping["method_mapping"] = methods strBuffer := new(bytes.Buffer) serviceMappingJsonStr, _ := json.Marshal(serviceMapping) fmt.Fprintf(strBuffer, httpRouterTemplate, string(serviceMappingJsonStr)) IngressLog.Infof("Found http2rpc buildHttp2RpcMethods %s", strBuffer.String()) result := buildPatchStruct(strBuffer.String()) return result, nil } func buildPatchStruct(config string) *_struct.Struct { val := &_struct.Struct{} err := jsonpb.Unmarshal(strings.NewReader(config), val) if err != nil { log.Errorf("jsonpb unmarshal failed: %s", config) } return val } func constructBasicAuthEnvoyFilter(rules *common.BasicAuthRules, namespace string) (*config.Config, error) { rulesStr, err := json.Marshal(rules) if err != nil { return nil, err } configuration := &wrappers.StringValue{ Value: string(rulesStr), } wasm := &wasm.Wasm{ Config: &v3.PluginConfig{ Name: "basic-auth", FailOpen: true, Vm: &v3.PluginConfig_VmConfig{ VmConfig: &v3.VmConfig{ Runtime: "envoy.wasm.runtime.null", Code: &corev3.AsyncDataSource{ Specifier: &corev3.AsyncDataSource_Local{ Local: &corev3.DataSource{ Specifier: &corev3.DataSource_InlineString{ InlineString: "envoy.wasm.basic_auth", }, }, }, }, }, }, Configuration: protoconv.MessageToAny(configuration), }, } wasmAny, err := anypb.New(wasm) if err != nil { return nil, err } typedConfig := &httppb.HttpFilter{ Name: "basic-auth", ConfigType: &httppb.HttpFilter_TypedConfig{ TypedConfig: wasmAny, }, } pbTypedConfig, err := util.MessageToStruct(typedConfig) if err != nil { return nil, err } return &config.Config{ Meta: config.Meta{ GroupVersionKind: gvk.EnvoyFilter, Name: common.CreateConvertedName(constants.IstioIngressGatewayName, "basic-auth"), Namespace: namespace, }, Spec: &networking.EnvoyFilter{ ConfigPatches: []*networking.EnvoyFilter_EnvoyConfigObjectPatch{ { ApplyTo: networking.EnvoyFilter_HTTP_FILTER, Match: &networking.EnvoyFilter_EnvoyConfigObjectMatch{ Context: networking.EnvoyFilter_GATEWAY, ObjectTypes: &networking.EnvoyFilter_EnvoyConfigObjectMatch_Listener{ Listener: &networking.EnvoyFilter_ListenerMatch{ FilterChain: &networking.EnvoyFilter_ListenerMatch_FilterChainMatch{ Filter: &networking.EnvoyFilter_ListenerMatch_FilterMatch{ Name: "envoy.filters.network.http_connection_manager", SubFilter: &networking.EnvoyFilter_ListenerMatch_SubFilterMatch{ Name: "envoy.filters.http.cors", }, }, }, }, }, }, Patch: &networking.EnvoyFilter_Patch{ Operation: networking.EnvoyFilter_Patch_INSERT_AFTER, Value: pbTypedConfig, }, }, }, }, }, nil } func QueryByName(serviceEntries []*memory.ServiceWrapper, serviceName string) (*memory.ServiceWrapper, error) { IngressLog.Infof("Found http2rpc serviceEntries %s", serviceEntries) for _, se := range serviceEntries { if se.ServiceName == serviceName { return se, nil } } return nil, fmt.Errorf("can't find ServiceEntry by serviceName:%v", serviceName) } func QueryRpcServiceVersion(serviceEntry *memory.ServiceWrapper, serviceName string) (string, error) { IngressLog.Infof("Found http2rpc serviceEntry %s", serviceEntry) IngressLog.Infof("Found http2rpc ServiceEntry %s", serviceEntry.ServiceEntry) IngressLog.Infof("Found http2rpc WorkloadSelector %s", serviceEntry.ServiceEntry.WorkloadSelector) IngressLog.Infof("Found http2rpc Labels %s", serviceEntry.ServiceEntry.WorkloadSelector.Labels) labels := (*serviceEntry).ServiceEntry.WorkloadSelector.Labels for key, value := range labels { if key == "version" { return value, nil } } return "", fmt.Errorf("can't get RpcServiceVersion for serviceName:%v", serviceName) } func (m *IngressConfig) Run(stop <-chan struct{}) { for _, remoteIngressController := range m.remoteIngressControllers { _ = remoteIngressController.SetWatchErrorHandler(m.watchErrorHandler) go remoteIngressController.Run(stop) } for _, remoteGatewayController := range m.remoteGatewayControllers { _ = remoteGatewayController.SetWatchErrorHandler(m.watchErrorHandler) go remoteGatewayController.Run(stop) } go m.mcpbridgeController.Run(stop) go m.wasmPluginController.Run(stop) go m.http2rpcController.Run(stop) go m.configmapMgr.HigressConfigController.Run(stop) } func (m *IngressConfig) HasSynced() bool { m.mutex.RLock() defer m.mutex.RUnlock() for _, remoteIngressController := range m.remoteIngressControllers { if !remoteIngressController.HasSynced() { return false } } for _, remoteGatewayController := range m.remoteGatewayControllers { if !remoteGatewayController.HasSynced() { return false } } if !m.mcpbridgeController.HasSynced() { return false } if !m.wasmPluginController.HasSynced() { return false } if !m.http2rpcController.HasSynced() { return false } if !m.configmapMgr.HigressConfigController.HasSynced() { return false } IngressLog.Info("Ingress config controller synced.") return true } func (m *IngressConfig) SetWatchErrorHandler(f func(r *cache.Reflector, err error)) error { m.watchErrorHandler = f return nil } func (m *IngressConfig) GetIngressRoutes() istiomodel.IngressRouteCollection { m.mutex.RLock() defer m.mutex.RUnlock() return m.ingressRouteCache } func (m *IngressConfig) GetIngressDomains() istiomodel.IngressDomainCollection { m.mutex.RLock() defer m.mutex.RUnlock() return m.ingressDomainCache } func (m *IngressConfig) CheckIngress(clusterName string) istiomodel.CheckIngressResponse { return istiomodel.CheckIngressResponse{} } func (m *IngressConfig) Services(clusterName string) ([]*v1.Service, error) { return nil, nil } func (m *IngressConfig) IngressControllers() map[string]string { return nil } func (m *IngressConfig) Schemas() collection.Schemas { return common.IngressIR } func (m *IngressConfig) Get(config.GroupVersionKind, string, string) *config.Config { return nil } func (m *IngressConfig) Create(config.Config) (revision string, err error) { return "", common.ErrUnsupportedOp } func (m *IngressConfig) Update(config.Config) (newRevision string, err error) { return "", common.ErrUnsupportedOp } func (m *IngressConfig) UpdateStatus(config.Config) (newRevision string, err error) { return "", common.ErrUnsupportedOp } func (m *IngressConfig) Patch(config.Config, config.PatchFunc) (string, error) { return "", common.ErrUnsupportedOp } func (m *IngressConfig) Delete(config.GroupVersionKind, string, string, *string) error { return common.ErrUnsupportedOp }