pkg/ingress/kube/ingressv1/controller.go (1,069 lines of code) (raw):
// Copyright (c) 2022 Alibaba Group Holding Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package ingressv1
import (
"errors"
"fmt"
"path"
"reflect"
"sort"
"strconv"
"strings"
"sync"
"github.com/hashicorp/go-multierror"
networking "istio.io/api/networking/v1alpha3"
"istio.io/istio/pilot/pkg/model"
istiomodel "istio.io/istio/pilot/pkg/model"
"istio.io/istio/pilot/pkg/model/credentials"
"istio.io/istio/pkg/config"
"istio.io/istio/pkg/config/constants"
"istio.io/istio/pkg/config/protocol"
"istio.io/istio/pkg/config/schema/gvk"
"istio.io/istio/pkg/config/schema/gvr"
schemakubeclient "istio.io/istio/pkg/config/schema/kubeclient"
kubeclient "istio.io/istio/pkg/kube"
"istio.io/istio/pkg/kube/controllers"
"istio.io/istio/pkg/kube/informerfactory"
ktypes "istio.io/istio/pkg/kube/kubetypes"
"istio.io/istio/pkg/util/sets"
ingress "k8s.io/api/networking/v1"
kerrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
listerv1 "k8s.io/client-go/listers/core/v1"
networkinglister "k8s.io/client-go/listers/networking/v1"
"k8s.io/client-go/tools/cache"
"github.com/alibaba/higress/pkg/cert"
"github.com/alibaba/higress/pkg/ingress/kube/annotations"
"github.com/alibaba/higress/pkg/ingress/kube/common"
"github.com/alibaba/higress/pkg/ingress/kube/secret"
"github.com/alibaba/higress/pkg/ingress/kube/util"
. "github.com/alibaba/higress/pkg/ingress/log"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
)
var (
_ common.IngressController = &controller{}
// follow specification of ingress-nginx
defaultPathType = ingress.PathTypePrefix
)
type controller struct {
queue controllers.Queue
virtualServiceHandlers []istiomodel.EventHandler
gatewayHandlers []istiomodel.EventHandler
destinationRuleHandlers []istiomodel.EventHandler
envoyFilterHandlers []istiomodel.EventHandler
options common.Options
mutex sync.RWMutex
// key: namespace/name
ingresses map[string]*ingress.Ingress
ingressInformer informerfactory.StartableInformer
ingressLister networkinglister.IngressLister
serviceInformer informerfactory.StartableInformer
serviceLister listerv1.ServiceLister
classInformer informerfactory.StartableInformer
classLister networkinglister.IngressClassLister
secretController secret.SecretController
statusSyncer *statusSyncer
}
// NewController creates a new Kubernetes controller
func NewController(localKubeClient, client kubeclient.Client, options common.Options, secretController secret.SecretController) common.IngressController {
opts := ktypes.InformerOptions{Namespace: options.WatchNamespace}
ingressInformer := schemakubeclient.GetInformerFilteredFromGVR(client, opts, gvr.Ingress)
ingressLister := networkinglister.NewIngressLister(ingressInformer.Informer.GetIndexer())
serviceInformer := schemakubeclient.GetInformerFilteredFromGVR(client, opts, gvr.Service)
serviceLister := listerv1.NewServiceLister(serviceInformer.Informer.GetIndexer())
classInformer := schemakubeclient.GetInformerFilteredFromGVR(client, opts, gvr.IngressClass)
classLister := networkinglister.NewIngressClassLister(classInformer.Informer.GetIndexer())
c := &controller{
options: options,
ingresses: make(map[string]*ingress.Ingress),
ingressInformer: ingressInformer,
ingressLister: ingressLister,
classInformer: classInformer,
classLister: classLister,
serviceInformer: serviceInformer,
serviceLister: serviceLister,
secretController: secretController,
}
c.queue = controllers.NewQueue("ingressv1",
controllers.WithReconciler(c.onEvent),
controllers.WithMaxAttempts(5))
_, _ = c.ingressInformer.Informer.AddEventHandler(controllers.ObjectHandler(c.queue.AddObject))
if options.EnableStatus {
c.statusSyncer = newStatusSyncer(localKubeClient, client, c, options.SystemNamespace, ingressLister, serviceLister)
} else {
IngressLog.Infof("Disable status update for cluster %s", options.ClusterId)
}
return c
}
func (c *controller) ServiceLister() listerv1.ServiceLister {
return c.serviceLister
}
func (c *controller) SecretLister() listerv1.SecretLister {
return c.secretController.Lister()
}
func (c *controller) Run(stop <-chan struct{}) {
if c.statusSyncer != nil {
go c.statusSyncer.run(stop)
}
go c.secretController.Run(stop)
defer utilruntime.HandleCrash()
if !cache.WaitForCacheSync(stop, c.informerSynced) {
IngressLog.Errorf("Failed to sync ingress controller cache for cluster %s", c.options.ClusterId)
return
}
c.queue.Run(stop)
}
func (c *controller) onEvent(namespacedName types.NamespacedName) error {
event := istiomodel.EventUpdate
ing, err := c.ingressLister.Ingresses(namespacedName.Namespace).Get(namespacedName.Name)
if err != nil {
if kerrors.IsNotFound(err) {
event = istiomodel.EventDelete
c.mutex.Lock()
ing = c.ingresses[namespacedName.String()]
delete(c.ingresses, namespacedName.String())
c.mutex.Unlock()
} else {
IngressLog.Warnf("ingressLister Get failed, ingress: %s, err: %v", namespacedName, err)
return err
}
}
// ingress deleted, and it is not processed before
if ing == nil {
return nil
}
IngressLog.Infof("ingress: %s, event: %s", namespacedName, event)
// we should check need process only when event is not delete,
// if it is delete event, and previously processed, we need to process too.
if event != istiomodel.EventDelete {
shouldProcess, err := c.shouldProcessIngressUpdate(ing)
if err != nil {
return err
}
if !shouldProcess {
IngressLog.Infof("no need process, ingress: %s", namespacedName)
return nil
}
}
drmetadata := config.Meta{
Name: ing.Name + "-" + "destinationrule",
Namespace: ing.Namespace,
GroupVersionKind: gvk.DestinationRule,
// Set this label so that we do not compare configs and just push.
Labels: map[string]string{constants.AlwaysPushLabel: "true"},
}
vsmetadata := config.Meta{
Name: ing.Name + "-" + "virtualservice",
Namespace: ing.Namespace,
GroupVersionKind: gvk.VirtualService,
// Set this label so that we do not compare configs and just push.
Labels: map[string]string{constants.AlwaysPushLabel: "true"},
}
efmetadata := config.Meta{
Name: ing.Name + "-" + "envoyfilter",
Namespace: ing.Namespace,
GroupVersionKind: gvk.EnvoyFilter,
// Set this label so that we do not compare configs and just push.
Labels: map[string]string{constants.AlwaysPushLabel: "true"},
}
gatewaymetadata := config.Meta{
Name: ing.Name + "-" + "gateway",
Namespace: ing.Namespace,
GroupVersionKind: gvk.Gateway,
// Set this label so that we do not compare configs and just push.
Labels: map[string]string{constants.AlwaysPushLabel: "true"},
}
for _, f := range c.destinationRuleHandlers {
f(config.Config{Meta: drmetadata}, config.Config{Meta: drmetadata}, event)
}
for _, f := range c.virtualServiceHandlers {
f(config.Config{Meta: vsmetadata}, config.Config{Meta: vsmetadata}, event)
}
for _, f := range c.envoyFilterHandlers {
f(config.Config{Meta: efmetadata}, config.Config{Meta: efmetadata}, event)
}
for _, f := range c.gatewayHandlers {
f(config.Config{Meta: gatewaymetadata}, config.Config{Meta: gatewaymetadata}, event)
}
return nil
}
func (c *controller) RegisterEventHandler(kind config.GroupVersionKind, f istiomodel.EventHandler) {
switch kind {
case gvk.VirtualService:
c.virtualServiceHandlers = append(c.virtualServiceHandlers, f)
case gvk.Gateway:
c.gatewayHandlers = append(c.gatewayHandlers, f)
case gvk.DestinationRule:
c.destinationRuleHandlers = append(c.destinationRuleHandlers, f)
case gvk.EnvoyFilter:
c.envoyFilterHandlers = append(c.envoyFilterHandlers, f)
}
}
func (c *controller) SetWatchErrorHandler(handler func(r *cache.Reflector, err error)) error {
var errs error
if err := c.serviceInformer.Informer.SetWatchErrorHandler(handler); err != nil {
errs = multierror.Append(errs, err)
}
if err := c.ingressInformer.Informer.SetWatchErrorHandler(handler); err != nil {
errs = multierror.Append(errs, err)
}
if err := c.secretController.Informer().SetWatchErrorHandler(handler); err != nil {
errs = multierror.Append(errs, err)
}
if err := c.classInformer.Informer.SetWatchErrorHandler(handler); err != nil {
errs = multierror.Append(errs, err)
}
return errs
}
func (c *controller) informerSynced() bool {
return c.ingressInformer.Informer.HasSynced() && c.serviceInformer.Informer.HasSynced() &&
c.classInformer.Informer.HasSynced()
}
func (c *controller) HasSynced() bool {
return c.queue.HasSynced() && c.secretController.HasSynced()
}
func (c *controller) List() []config.Config {
out := make([]config.Config, 0, len(c.ingresses))
for _, raw := range c.ingressInformer.Informer.GetStore().List() {
ing, ok := raw.(*ingress.Ingress)
if !ok {
IngressLog.Warnf("get ingress from informer failed: %v", raw)
continue
}
should, err := c.shouldProcessIngress(ing)
if err != nil {
IngressLog.Warnf("check should process ingress failed: %v", err)
continue
}
if !should {
IngressLog.Debugf("no need process ingress: %s/%s", ing.Namespace, ing.Name)
continue
}
copiedConfig := ing.DeepCopy()
setDefaultMSEIngressOptionalField(copiedConfig)
outConfig := config.Config{
Meta: config.Meta{
Name: copiedConfig.Name,
Namespace: copiedConfig.Namespace,
Annotations: common.CreateOrUpdateAnnotations(copiedConfig.Annotations, c.options),
Labels: copiedConfig.Labels,
CreationTimestamp: copiedConfig.CreationTimestamp.Time,
},
Spec: copiedConfig.Spec,
}
out = append(out, outConfig)
}
common.RecordIngressNumber(c.options.ClusterId, len(out))
return out
}
func extractTLSSecretName(host string, tls []ingress.IngressTLS) string {
if len(tls) == 0 {
return ""
}
for _, t := range tls {
match := false
for _, h := range t.Hosts {
if h == host {
match = true
}
}
if match {
return t.SecretName
}
}
return ""
}
func (c *controller) ConvertGateway(convertOptions *common.ConvertOptions, wrapper *common.WrapperConfig, httpsCredentialConfig *cert.Config) error {
// Ignore canary config.
if wrapper.AnnotationsConfig.IsCanary() {
return nil
}
cfg := wrapper.Config
ingressV1, ok := cfg.Spec.(ingress.IngressSpec)
if !ok {
common.IncrementInvalidIngress(c.options.ClusterId, common.Unknown)
return fmt.Errorf("convert type is invalid in cluster %s", c.options.ClusterId)
}
if len(ingressV1.Rules) == 0 && ingressV1.DefaultBackend == nil {
common.IncrementInvalidIngress(c.options.ClusterId, common.EmptyRule)
return fmt.Errorf("invalid ingress rule %s:%s in cluster %s, either `defaultBackend` or `rules` must be specified", cfg.Namespace, cfg.Name, c.options.ClusterId)
}
for _, rule := range ingressV1.Rules {
// Need create builder for every rule.
domainBuilder := &common.IngressDomainBuilder{
ClusterId: c.options.ClusterId,
Protocol: common.HTTP,
Host: rule.Host,
Ingress: cfg,
Event: common.Normal,
}
// Extract the previous gateway and builder
wrapperGateway, exist := convertOptions.Gateways[rule.Host]
preDomainBuilder, _ := convertOptions.IngressDomainCache.Valid[rule.Host]
if !exist {
wrapperGateway = &common.WrapperGateway{
Gateway: &networking.Gateway{},
WrapperConfig: wrapper,
ClusterId: c.options.ClusterId,
Host: rule.Host,
}
if c.options.GatewaySelectorKey != "" {
wrapperGateway.Gateway.Selector = map[string]string{c.options.GatewaySelectorKey: c.options.GatewaySelectorValue}
}
wrapperGateway.Gateway.Servers = append(wrapperGateway.Gateway.Servers, &networking.Server{
Port: &networking.Port{
Number: c.options.GatewayHttpPort,
Protocol: string(protocol.HTTP),
Name: common.CreateConvertedName("http-"+strconv.FormatUint(uint64(c.options.GatewayHttpPort), 10)+"-ingress", string(c.options.ClusterId)),
},
Hosts: []string{rule.Host},
})
// Add new gateway, builder
convertOptions.Gateways[rule.Host] = wrapperGateway
convertOptions.IngressDomainCache.Valid[rule.Host] = domainBuilder
} else {
// Fallback to get downstream tls from current ingress.
if wrapperGateway.WrapperConfig.AnnotationsConfig.DownstreamTLS == nil {
wrapperGateway.WrapperConfig.AnnotationsConfig.DownstreamTLS = wrapper.AnnotationsConfig.DownstreamTLS
}
}
// There are no tls settings, so just skip.
if len(ingressV1.TLS) == 0 {
continue
}
// Get tls secret matching the rule host
secretName := extractTLSSecretName(rule.Host, ingressV1.TLS)
secretNamespace := cfg.Namespace
if secretName != "" {
if httpsCredentialConfig != nil && httpsCredentialConfig.FallbackForInvalidSecret {
_, err := c.secretController.Lister().Secrets(secretNamespace).Get(secretName)
if err != nil {
if k8serrors.IsNotFound(err) {
// If there is no matching secret, try to get it from configmap.
matchSecretName := httpsCredentialConfig.MatchSecretNameByDomain(rule.Host)
if matchSecretName != "" {
namespace, secret := cert.ParseTLSSecret(matchSecretName)
if namespace == "" {
secretNamespace = c.options.SystemNamespace
} else {
secretNamespace = namespace
}
secretName = secret
}
}
}
}
} else {
// If there is no matching secret, try to get it from configmap.
if httpsCredentialConfig != nil {
secretName = httpsCredentialConfig.MatchSecretNameByDomain(rule.Host)
secretNamespace = c.options.SystemNamespace
namespace, secret := cert.ParseTLSSecret(secretName)
if namespace != "" {
secretNamespace = namespace
secretName = secret
}
}
}
if secretName == "" {
// There no matching secret, so just skip.
continue
}
domainBuilder.Protocol = common.HTTPS
domainBuilder.SecretName = path.Join(c.options.ClusterId.String(), cfg.Namespace, secretName)
// There is a matching secret and the gateway has already a tls secret.
// We should report the duplicated tls secret event.
if wrapperGateway.IsHTTPS() {
domainBuilder.Event = common.DuplicatedTls
domainBuilder.PreIngress = preDomainBuilder.Ingress
convertOptions.IngressDomainCache.Invalid = append(convertOptions.IngressDomainCache.Invalid,
domainBuilder.Build())
continue
}
// Append https server
wrapperGateway.Gateway.Servers = append(wrapperGateway.Gateway.Servers, &networking.Server{
Port: &networking.Port{
Number: uint32(c.options.GatewayHttpsPort),
Protocol: string(protocol.HTTPS),
Name: common.CreateConvertedName("https-"+strconv.FormatUint(uint64(c.options.GatewayHttpsPort), 10)+"-ingress", string(c.options.ClusterId)),
},
Hosts: []string{rule.Host},
Tls: &networking.ServerTLSSettings{
Mode: networking.ServerTLSSettings_SIMPLE,
CredentialName: credentials.ToKubernetesIngressResource(c.options.RawClusterId, secretNamespace, secretName),
},
})
// Update domain builder
convertOptions.IngressDomainCache.Valid[rule.Host] = domainBuilder
}
return nil
}
func (c *controller) ConvertHTTPRoute(convertOptions *common.ConvertOptions, wrapper *common.WrapperConfig) error {
// Canary ingress will be processed in the end.
if wrapper.AnnotationsConfig.IsCanary() {
convertOptions.CanaryIngresses = append(convertOptions.CanaryIngresses, wrapper)
return nil
}
cfg := wrapper.Config
ingressV1, ok := cfg.Spec.(ingress.IngressSpec)
if !ok {
common.IncrementInvalidIngress(c.options.ClusterId, common.Unknown)
return fmt.Errorf("convert type is invalid in cluster %s", c.options.ClusterId)
}
if len(ingressV1.Rules) == 0 && ingressV1.DefaultBackend == nil {
common.IncrementInvalidIngress(c.options.ClusterId, common.EmptyRule)
return fmt.Errorf("invalid ingress rule %s:%s in cluster %s, either `defaultBackend` or `rules` must be specified", cfg.Namespace, cfg.Name, c.options.ClusterId)
}
if ingressV1.DefaultBackend != nil &&
((ingressV1.DefaultBackend.Service != nil &&
ingressV1.DefaultBackend.Service.Name != "") ||
ingressV1.DefaultBackend.Resource != nil) {
convertOptions.HasDefaultBackend = true
}
// In one ingress, we will limit the rule conflict.
// When the host, pathType, path of two rule are same, we think there is a conflict event.
definedRules := sets.New[string]()
var (
// But in across ingresses case, we will restrict this limit.
// When the {host, path, headers, method, params} of two rule in different ingress are same, we think there is a conflict event.
tempRuleKey []string
)
for _, rule := range ingressV1.Rules {
if rule.HTTP == nil || len(rule.HTTP.Paths) == 0 {
IngressLog.Warnf("invalid ingress rule %s:%s for host %q in cluster %s, no paths defined", cfg.Namespace, cfg.Name, rule.Host, c.options.ClusterId)
continue
}
wrapperVS, exist := convertOptions.VirtualServices[rule.Host]
if !exist {
wrapperVS = &common.WrapperVirtualService{
VirtualService: &networking.VirtualService{
Hosts: []string{rule.Host},
},
WrapperConfig: wrapper,
}
convertOptions.VirtualServices[rule.Host] = wrapperVS
}
// Record the latest app root for per host.
redirect := wrapper.AnnotationsConfig.Redirect
if redirect != nil && redirect.AppRoot != "" {
wrapperVS.AppRoot = redirect.AppRoot
}
wrapperHttpRoutes := make([]*common.WrapperHTTPRoute, 0, len(rule.HTTP.Paths))
for _, httpPath := range rule.HTTP.Paths {
wrapperHttpRoute := &common.WrapperHTTPRoute{
HTTPRoute: &networking.HTTPRoute{},
WrapperConfig: wrapper,
Host: rule.Host,
ClusterId: c.options.ClusterId,
}
var pathType common.PathType
originPath := httpPath.Path
if annotationsConfig := wrapper.AnnotationsConfig; annotationsConfig.NeedRegexMatch(originPath) {
if annotationsConfig.IsFullPathRegexMatch() {
pathType = common.FullPathRegex
} else {
pathType = common.PrefixRegex
}
} else {
switch *httpPath.PathType {
case ingress.PathTypeExact:
pathType = common.Exact
case ingress.PathTypePrefix:
pathType = common.Prefix
if httpPath.Path != "/" {
originPath = strings.TrimSuffix(httpPath.Path, "/")
}
}
}
wrapperHttpRoute.OriginPath = originPath
wrapperHttpRoute.OriginPathType = pathType
wrapperHttpRoute.HTTPRoute.Match = c.generateHttpMatches(pathType, httpPath.Path, wrapperVS)
wrapperHttpRoute.HTTPRoute.Name = common.GenerateUniqueRouteName(c.options.SystemNamespace, wrapperHttpRoute)
ingressRouteBuilder := convertOptions.IngressRouteCache.New(wrapperHttpRoute)
hostAndPath := wrapperHttpRoute.PathFormat()
key := createRuleKey(cfg.Annotations, hostAndPath)
wrapperHttpRoute.RuleKey = key
if WrapPreIngress, exist := convertOptions.Route2Ingress[key]; exist {
ingressRouteBuilder.PreIngress = WrapPreIngress.Config
ingressRouteBuilder.Event = common.DuplicatedRoute
}
tempRuleKey = append(tempRuleKey, key)
// Two duplicated rules in the same ingress.
if ingressRouteBuilder.Event == common.Normal {
pathFormat := wrapperHttpRoute.PathFormat()
if definedRules.Contains(pathFormat) {
ingressRouteBuilder.PreIngress = cfg
ingressRouteBuilder.Event = common.DuplicatedRoute
}
definedRules.Insert(pathFormat)
}
// backend service check
var event common.Event
destinationConfig := wrapper.AnnotationsConfig.Destination
wrapperHttpRoute.HTTPRoute.Route, event = c.backendToRouteDestination(&httpPath.Backend, cfg.Namespace, ingressRouteBuilder, destinationConfig)
if destinationConfig != nil {
wrapperHttpRoute.WeightTotal = int32(destinationConfig.WeightSum)
}
if ingressRouteBuilder.Event != common.Normal {
event = ingressRouteBuilder.Event
}
if event != common.Normal {
common.IncrementInvalidIngress(c.options.ClusterId, event)
ingressRouteBuilder.Event = event
} else {
wrapperHttpRoutes = append(wrapperHttpRoutes, wrapperHttpRoute)
}
convertOptions.IngressRouteCache.Add(ingressRouteBuilder)
}
for idx, item := range tempRuleKey {
if val, exist := convertOptions.Route2Ingress[item]; !exist || strings.Compare(val.RuleKey, tempRuleKey[idx]) != 0 {
convertOptions.Route2Ingress[item] = &common.WrapperConfigWithRuleKey{
Config: cfg,
RuleKey: tempRuleKey[idx],
}
}
}
old, f := convertOptions.HTTPRoutes[rule.Host]
if f {
old = append(old, wrapperHttpRoutes...)
convertOptions.HTTPRoutes[rule.Host] = old
} else {
convertOptions.HTTPRoutes[rule.Host] = wrapperHttpRoutes
}
}
return nil
}
func (c *controller) generateHttpMatches(pathType common.PathType, path string, wrapperVS *common.WrapperVirtualService) []*networking.HTTPMatchRequest {
var httpMatches []*networking.HTTPMatchRequest
httpMatch := &networking.HTTPMatchRequest{}
switch pathType {
case common.PrefixRegex:
httpMatch.Uri = &networking.StringMatch{
MatchType: &networking.StringMatch_Regex{Regex: path + ".*"},
}
case common.FullPathRegex:
httpMatch.Uri = &networking.StringMatch{
MatchType: &networking.StringMatch_Regex{Regex: path + "$"},
}
case common.Exact:
httpMatch.Uri = &networking.StringMatch{
MatchType: &networking.StringMatch_Exact{Exact: path},
}
case common.Prefix:
if path == "/" {
if wrapperVS != nil {
wrapperVS.ConfiguredDefaultBackend = true
}
// Optimize common case of / to not needed regex
httpMatch.Uri = &networking.StringMatch{
MatchType: &networking.StringMatch_Prefix{Prefix: path},
}
} else {
newPath := strings.TrimSuffix(path, "/")
httpMatches = append(httpMatches, c.generateHttpMatches(common.Exact, newPath, wrapperVS)...)
httpMatch.Uri = &networking.StringMatch{
MatchType: &networking.StringMatch_Prefix{Prefix: newPath + "/"},
}
}
}
httpMatches = append(httpMatches, httpMatch)
return httpMatches
}
func (c *controller) ApplyDefaultBackend(convertOptions *common.ConvertOptions, wrapper *common.WrapperConfig) error {
if wrapper.AnnotationsConfig.IsCanary() {
return nil
}
cfg := wrapper.Config
ingressV1, ok := cfg.Spec.(ingress.IngressSpec)
if !ok {
common.IncrementInvalidIngress(c.options.ClusterId, common.Unknown)
return fmt.Errorf("convert type is invalid in cluster %s", c.options.ClusterId)
}
if ingressV1.DefaultBackend == nil {
return nil
}
apply := func(host string, op func(vs *common.WrapperVirtualService, defaultRoute *common.WrapperHTTPRoute)) {
wirecardVS, exist := convertOptions.VirtualServices[host]
if !exist || !wirecardVS.ConfiguredDefaultBackend {
if !exist {
wirecardVS = &common.WrapperVirtualService{
VirtualService: &networking.VirtualService{
Hosts: []string{host},
},
WrapperConfig: wrapper,
}
convertOptions.VirtualServices[host] = wirecardVS
}
specDefaultBackend := c.createDefaultRoute(wrapper, ingressV1.DefaultBackend, host)
if specDefaultBackend != nil {
convertOptions.VirtualServices[host] = wirecardVS
op(wirecardVS, specDefaultBackend)
}
}
}
// First process *
apply("*", func(_ *common.WrapperVirtualService, defaultRoute *common.WrapperHTTPRoute) {
var hasFound bool
for _, httpRoute := range convertOptions.HTTPRoutes["*"] {
if httpRoute.OriginPathType == common.Prefix && httpRoute.OriginPath == "/" {
hasFound = true
convertOptions.IngressRouteCache.Delete(httpRoute)
httpRoute.HTTPRoute = defaultRoute.HTTPRoute
httpRoute.WrapperConfig = defaultRoute.WrapperConfig
convertOptions.IngressRouteCache.NewAndAdd(httpRoute)
}
}
if !hasFound {
convertOptions.HTTPRoutes["*"] = append(convertOptions.HTTPRoutes["*"], defaultRoute)
}
})
for _, rule := range ingressV1.Rules {
if rule.Host == "*" {
continue
}
apply(rule.Host, func(vs *common.WrapperVirtualService, defaultRoute *common.WrapperHTTPRoute) {
convertOptions.HTTPRoutes[rule.Host] = append(convertOptions.HTTPRoutes[rule.Host], defaultRoute)
vs.ConfiguredDefaultBackend = true
convertOptions.IngressRouteCache.NewAndAdd(defaultRoute)
})
}
return nil
}
func (c *controller) ApplyCanaryIngress(convertOptions *common.ConvertOptions, wrapper *common.WrapperConfig) error {
byHeader, _ := wrapper.AnnotationsConfig.CanaryKind()
cfg := wrapper.Config
ingressV1, ok := cfg.Spec.(ingress.IngressSpec)
if !ok {
common.IncrementInvalidIngress(c.options.ClusterId, common.Unknown)
return fmt.Errorf("convert type is invalid in cluster %s", c.options.ClusterId)
}
if len(ingressV1.Rules) == 0 && ingressV1.DefaultBackend == nil {
common.IncrementInvalidIngress(c.options.ClusterId, common.EmptyRule)
return fmt.Errorf("invalid ingress rule %s:%s in cluster %s, either `defaultBackend` or `rules` must be specified", cfg.Namespace, cfg.Name, c.options.ClusterId)
}
for _, rule := range ingressV1.Rules {
if rule.HTTP == nil || len(rule.HTTP.Paths) == 0 {
IngressLog.Warnf("invalid ingress rule %s:%s for host %q in cluster %s, no paths defined", cfg.Namespace, cfg.Name, rule.Host, c.options.ClusterId)
continue
}
routes, exist := convertOptions.HTTPRoutes[rule.Host]
if !exist {
continue
}
for _, httpPath := range rule.HTTP.Paths {
canary := &common.WrapperHTTPRoute{
HTTPRoute: &networking.HTTPRoute{},
WrapperConfig: wrapper,
Host: rule.Host,
ClusterId: c.options.ClusterId,
}
var pathType common.PathType
originPath := httpPath.Path
if annotationsConfig := wrapper.AnnotationsConfig; annotationsConfig.NeedRegexMatch(originPath) {
if annotationsConfig.IsFullPathRegexMatch() {
pathType = common.FullPathRegex
} else {
pathType = common.PrefixRegex
}
} else {
switch *httpPath.PathType {
case ingress.PathTypeExact:
pathType = common.Exact
case ingress.PathTypePrefix:
pathType = common.Prefix
if httpPath.Path != "/" {
originPath = strings.TrimSuffix(httpPath.Path, "/")
}
}
}
canary.OriginPath = originPath
canary.OriginPathType = pathType
ingressRouteBuilder := convertOptions.IngressRouteCache.New(canary)
// backend service check
var event common.Event
destinationConfig := wrapper.AnnotationsConfig.Destination
canary.HTTPRoute.Route, event = c.backendToRouteDestination(&httpPath.Backend, cfg.Namespace, ingressRouteBuilder, destinationConfig)
if event != common.Normal {
common.IncrementInvalidIngress(c.options.ClusterId, event)
ingressRouteBuilder.Event = event
convertOptions.IngressRouteCache.Add(ingressRouteBuilder)
continue
}
canary.RuleKey = createRuleKey(canary.WrapperConfig.Config.Annotations, canary.PathFormat())
// find the base ingress
pos := 0
var targetRoute *common.WrapperHTTPRoute
for _, route := range routes {
if isCanaryRoute(canary, route) {
targetRoute = route
break
}
pos += 1
}
if targetRoute == nil {
continue
}
canaryConfig := wrapper.AnnotationsConfig.Canary
// Header, Cookie
if byHeader {
IngressLog.Debug("Insert canary route by header")
annotations.ApplyByHeader(canary.HTTPRoute, targetRoute.HTTPRoute, canary.WrapperConfig.AnnotationsConfig)
canary.HTTPRoute.Name = common.GenerateUniqueRouteName(c.options.SystemNamespace, canary)
} else {
IngressLog.Debug("Merge canary route by weight")
if targetRoute.WeightTotal == 0 {
targetRoute.WeightTotal = int32(canaryConfig.WeightTotal)
}
annotations.ApplyByWeight(canary.HTTPRoute, targetRoute.HTTPRoute, canary.WrapperConfig.AnnotationsConfig)
}
IngressLog.Debugf("Canary route is %v", canary)
if byHeader {
// Inherit policy from normal route
canary.WrapperConfig.AnnotationsConfig.Auth = targetRoute.WrapperConfig.AnnotationsConfig.Auth
routes = append(routes[:pos+1], routes[pos:]...)
routes[pos] = canary
convertOptions.HTTPRoutes[rule.Host] = routes
// Recreate route name.
ingressRouteBuilder.RouteName = common.GenerateUniqueRouteName(c.options.SystemNamespace, canary)
convertOptions.IngressRouteCache.Add(ingressRouteBuilder)
} else {
convertOptions.IngressRouteCache.Update(targetRoute)
}
}
}
return nil
}
func (c *controller) ConvertTrafficPolicy(convertOptions *common.ConvertOptions, wrapper *common.WrapperConfig) error {
if !wrapper.AnnotationsConfig.NeedTrafficPolicy() {
return nil
}
cfg := wrapper.Config
ingressV1, ok := cfg.Spec.(ingress.IngressSpec)
if !ok {
common.IncrementInvalidIngress(c.options.ClusterId, common.Unknown)
return fmt.Errorf("convert type is invalid in cluster %s", c.options.ClusterId)
}
if len(ingressV1.Rules) == 0 && ingressV1.DefaultBackend == nil {
common.IncrementInvalidIngress(c.options.ClusterId, common.EmptyRule)
return fmt.Errorf("invalid ingress rule %s:%s in cluster %s, either `defaultBackend` or `rules` must be specified", cfg.Namespace, cfg.Name, c.options.ClusterId)
}
if ingressV1.DefaultBackend != nil {
err := c.storeBackendTrafficPolicy(wrapper, ingressV1.DefaultBackend, convertOptions.Service2TrafficPolicy)
if err != nil {
IngressLog.Errorf("ignore default service within ingress %s/%s, since error:%v", cfg.Namespace, cfg.Name, err)
}
}
for _, rule := range ingressV1.Rules {
if rule.HTTP == nil || len(rule.HTTP.Paths) == 0 {
continue
}
for _, httpPath := range rule.HTTP.Paths {
err := c.storeBackendTrafficPolicy(wrapper, &httpPath.Backend, convertOptions.Service2TrafficPolicy)
if err != nil {
IngressLog.Errorf("ignore service within ingress %s/%s, since error:%v", cfg.Namespace, cfg.Name, err)
}
}
}
return nil
}
func (c *controller) storeBackendTrafficPolicy(wrapper *common.WrapperConfig, backend *ingress.IngressBackend, store map[common.ServiceKey]*common.WrapperTrafficPolicy) error {
if backend == nil {
return errors.New("invalid empty backend")
}
if common.ValidateBackendResource(backend.Resource) && wrapper.AnnotationsConfig.Destination != nil {
for _, dest := range wrapper.AnnotationsConfig.Destination.McpDestination {
portNumber := dest.Destination.GetPort().GetNumber()
serviceKey := common.CreateMcpServiceKey(dest.Destination.Host, int32(portNumber))
if _, exist := store[serviceKey]; !exist {
if serviceKey.Port != 0 {
store[serviceKey] = &common.WrapperTrafficPolicy{
PortTrafficPolicy: &networking.TrafficPolicy_PortTrafficPolicy{
Port: &networking.PortSelector{
Number: uint32(serviceKey.Port),
},
},
WrapperConfig: wrapper,
}
} else {
store[serviceKey] = &common.WrapperTrafficPolicy{
TrafficPolicy: &networking.TrafficPolicy{},
WrapperConfig: wrapper,
}
}
}
}
} else {
if backend.Service == nil {
return nil
}
serviceKey, err := c.createServiceKey(backend.Service, wrapper.Config.Namespace)
if err != nil {
return fmt.Errorf("ignore service %s within ingress %s/%s", serviceKey.Name, wrapper.Config.Namespace, wrapper.Config.Name)
}
if _, exist := store[serviceKey]; !exist {
store[serviceKey] = &common.WrapperTrafficPolicy{
PortTrafficPolicy: &networking.TrafficPolicy_PortTrafficPolicy{
Port: &networking.PortSelector{
Number: uint32(serviceKey.Port),
},
},
WrapperConfig: wrapper,
}
}
}
return nil
}
func (c *controller) createDefaultRoute(wrapper *common.WrapperConfig, backend *ingress.IngressBackend, host string) *common.WrapperHTTPRoute {
if backend == nil {
return nil
}
var routeDestination []*networking.HTTPRouteDestination
if common.ValidateBackendResource(backend.Resource) {
routeDestination = wrapper.AnnotationsConfig.Destination.McpDestination
} else {
service := backend.Service
namespace := wrapper.Config.Namespace
port := &networking.PortSelector{}
if service.Port.Number > 0 {
port.Number = uint32(service.Port.Number)
} else {
resolvedPort, err := resolveNamedPort(service, namespace, c.serviceLister)
if err != nil {
return nil
}
port.Number = uint32(resolvedPort)
}
routeDestination = []*networking.HTTPRouteDestination{
{
Destination: &networking.Destination{
Host: util.CreateServiceFQDN(namespace, service.Name),
Port: port,
},
Weight: 100,
},
}
}
route := &common.WrapperHTTPRoute{
HTTPRoute: &networking.HTTPRoute{
Route: routeDestination,
},
WrapperConfig: wrapper,
ClusterId: c.options.ClusterId,
Host: host,
IsDefaultBackend: true,
OriginPathType: common.Prefix,
OriginPath: "/",
}
route.HTTPRoute.Name = common.GenerateUniqueRouteNameWithSuffix(c.options.SystemNamespace, route, "default")
return route
}
func (c *controller) createServiceKey(service *ingress.IngressServiceBackend, namespace string) (common.ServiceKey, error) {
serviceKey := common.ServiceKey{}
if service == nil || service.Name == "" {
return serviceKey, errors.New("service name is empty")
}
var port int32
var err error
if service.Port.Number > 0 {
port = service.Port.Number
} else {
port, err = resolveNamedPort(service, namespace, c.serviceLister)
if err != nil {
return serviceKey, err
}
}
return common.ServiceKey{
Namespace: namespace,
Name: service.Name,
Port: port,
}, nil
}
func isCanaryRoute(canary, route *common.WrapperHTTPRoute) bool {
return !route.WrapperConfig.AnnotationsConfig.IsCanary() && canary.RuleKey == route.RuleKey
}
func (c *controller) backendToRouteDestination(backend *ingress.IngressBackend, namespace string,
builder *common.IngressRouteBuilder, config *annotations.DestinationConfig) ([]*networking.HTTPRouteDestination, common.Event) {
if backend == nil || (backend.Service == nil && backend.Resource == nil) {
return nil, common.InvalidBackendService
}
if backend.Service == nil {
if config != nil {
return config.McpDestination, common.Normal
}
return nil, common.InvalidBackendService
}
service := backend.Service
builder.PortName = service.Port.Name
port := &networking.PortSelector{}
if service.Port.Number > 0 {
port.Number = uint32(service.Port.Number)
} else {
resolvedPort, err := resolveNamedPort(service, namespace, c.serviceLister)
if err != nil {
return nil, common.PortNameResolveError
}
port.Number = uint32(resolvedPort)
}
builder.ServiceList = []model.BackendService{
{
Namespace: namespace,
Name: service.Name,
Port: port.Number,
Weight: 100,
},
}
return []*networking.HTTPRouteDestination{
{
Destination: &networking.Destination{
Host: util.CreateServiceFQDN(namespace, service.Name),
Port: port,
},
Weight: 100,
},
}, common.Normal
}
func resolveNamedPort(service *ingress.IngressServiceBackend, namespace string, serviceLister listerv1.ServiceLister) (int32, error) {
svc, err := serviceLister.Services(namespace).Get(service.Name)
if err != nil {
return 0, err
}
for _, port := range svc.Spec.Ports {
if port.Name == service.Port.Name {
return port.Port, nil
}
}
return 0, common.ErrNotFound
}
func (c *controller) shouldProcessIngressWithClass(ingress *ingress.Ingress, ingressClass *ingress.IngressClass) bool {
if class, exists := ingress.Annotations[util.IngressClassAnnotation]; exists {
switch c.options.IngressClass {
case "":
return true
case common.DefaultIngressClass:
return class == "" || class == common.DefaultIngressClass
default:
return c.options.IngressClass == class
}
} else if ingressClass != nil {
switch c.options.IngressClass {
case "":
return true
default:
return c.options.IngressClass == ingressClass.Name
}
} else {
ingressClassName := ingress.Spec.IngressClassName
switch c.options.IngressClass {
case "":
return true
case common.DefaultIngressClass:
return ingressClassName == nil || *ingressClassName == "" ||
*ingressClassName == common.DefaultIngressClass
default:
return ingressClassName != nil && *ingressClassName == c.options.IngressClass
}
}
}
func (c *controller) shouldProcessIngress(i *ingress.Ingress) (bool, error) {
var class *ingress.IngressClass
if c.classLister != nil && i.Spec.IngressClassName != nil {
classCache, err := c.classLister.Get(*i.Spec.IngressClassName)
if err != nil && !kerrors.IsNotFound(err) {
return false, fmt.Errorf("failed to get ingress class %v from cluster %s: %v", i.Spec.IngressClassName, c.options.ClusterId, err)
}
class = classCache
}
// first check ingress class
if c.shouldProcessIngressWithClass(i, class) {
// then check namespace
switch c.options.WatchNamespace {
case "":
return true, nil
default:
return c.options.WatchNamespace == i.Namespace, nil
}
}
return false, nil
}
// shouldProcessIngressUpdate checks whether we should renotify registered handlers about an update event
func (c *controller) shouldProcessIngressUpdate(ing *ingress.Ingress) (bool, error) {
shouldProcess, err := c.shouldProcessIngress(ing)
if err != nil {
return false, err
}
namespacedName := ing.Namespace + "/" + ing.Name
if shouldProcess {
// record processed ingress
c.mutex.Lock()
preConfig, exist := c.ingresses[namespacedName]
c.ingresses[namespacedName] = ing
c.mutex.Unlock()
// We only care about annotations, labels and spec.
if exist {
if !reflect.DeepEqual(preConfig.Annotations, ing.Annotations) {
IngressLog.Debugf("Annotations of ingress %s changed, should process.", namespacedName)
return true, nil
}
if !reflect.DeepEqual(preConfig.Labels, ing.Labels) {
IngressLog.Debugf("Labels of ingress %s changed, should process.", namespacedName)
return true, nil
}
if !reflect.DeepEqual(preConfig.Spec, ing.Spec) {
IngressLog.Debugf("Spec of ingress %s changed, should process.", namespacedName)
return true, nil
}
return false, nil
}
IngressLog.Debugf("First receive relative ingress %s, should process.", namespacedName)
return true, nil
}
c.mutex.Lock()
_, preProcessed := c.ingresses[namespacedName]
// previous processed but should not currently, delete it
if preProcessed && !shouldProcess {
delete(c.ingresses, namespacedName)
}
c.mutex.Unlock()
return preProcessed, nil
}
// setDefaultMSEIngressOptionalField sets a default value for optional fields when is not defined.
func setDefaultMSEIngressOptionalField(ing *ingress.Ingress) {
for idx, tls := range ing.Spec.TLS {
if len(tls.Hosts) == 0 {
ing.Spec.TLS[idx].Hosts = []string{common.DefaultHost}
}
}
for idx, rule := range ing.Spec.Rules {
if rule.IngressRuleValue.HTTP == nil {
continue
}
if rule.Host == "" {
ing.Spec.Rules[idx].Host = common.DefaultHost
}
for innerIdx := range rule.IngressRuleValue.HTTP.Paths {
p := &rule.IngressRuleValue.HTTP.Paths[innerIdx]
if p.Path == "" {
p.Path = common.DefaultPath
}
if p.PathType == nil {
p.PathType = &defaultPathType
// for old k8s version
if !annotations.NeedRegexMatch(ing.Annotations) {
if strings.HasSuffix(p.Path, ".*") {
p.Path = strings.TrimSuffix(p.Path, ".*")
}
if strings.HasSuffix(p.Path, "/*") {
p.Path = strings.TrimSuffix(p.Path, "/*")
}
}
}
if *p.PathType == ingress.PathTypeImplementationSpecific {
p.PathType = &defaultPathType
}
}
}
}
// createRuleKey according to the pathType, path, methods, headers, params of rules
func createRuleKey(annots map[string]string, hostAndPath string) string {
var (
headers [][2]string
params [][2]string
sb strings.Builder
)
sep := "\n\n"
// path
sb.WriteString(hostAndPath)
sb.WriteString(sep)
// methods
if str, ok := annots[annotations.HigressAnnotationsPrefix+"/"+annotations.MatchMethod]; ok {
sb.WriteString(str)
}
sb.WriteString(sep)
start := len(annotations.HigressAnnotationsPrefix) + 1 // example: higress.io/exact-match-header-key: value
// headers && params
for k, val := range annots {
if idx := strings.Index(k, annotations.MatchHeader); idx != -1 {
key := k[start:idx] + k[idx+len(annotations.MatchHeader)+1:]
headers = append(headers, [2]string{key, val})
} else if idx := strings.Index(k, annotations.MatchPseudoHeader); idx != -1 {
key := k[start:idx] + ":" + k[idx+len(annotations.MatchPseudoHeader)+1:]
headers = append(headers, [2]string{key, val})
} else if idx := strings.Index(k, annotations.MatchQuery); idx != -1 {
key := k[start:idx] + k[idx+len(annotations.MatchQuery)+1:]
params = append(params, [2]string{key, val})
}
}
sort.SliceStable(headers, func(i, j int) bool {
return headers[i][0] < headers[j][0]
})
sort.SliceStable(params, func(i, j int) bool {
return params[i][0] < params[j][0]
})
for idx := range headers {
if idx != 0 {
sb.WriteByte('\n')
}
sb.WriteString(headers[idx][0])
sb.WriteByte('\t')
sb.WriteString(headers[idx][1])
}
sb.WriteString(sep)
for idx := range params {
if idx != 0 {
sb.WriteByte('\n')
}
sb.WriteString(params[idx][0])
sb.WriteByte('\t')
sb.WriteString(params[idx][1])
}
sb.WriteString(sep)
return sb.String()
}