pkg/k8scontext/context.go (806 lines of code) (raw):
// -------------------------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
// --------------------------------------------------------------------------------------------
package k8scontext
import (
"context"
"fmt"
"sort"
"strings"
"time"
mapset "github.com/deckarep/golang-set"
"github.com/knative/pkg/apis/istio/v1alpha3"
v1 "k8s.io/api/core/v1"
extensionsv1beta1 "k8s.io/api/extensions/v1beta1"
networking "k8s.io/api/networking/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
"github.com/Azure/application-gateway-kubernetes-ingress/pkg/annotations"
agpoolv1beta1 "github.com/Azure/application-gateway-kubernetes-ingress/pkg/apis/azureapplicationgatewaybackendpool/v1beta1"
aginstv1beta1 "github.com/Azure/application-gateway-kubernetes-ingress/pkg/apis/azureapplicationgatewayinstanceupdatestatus/v1beta1"
agrewritev1beta1 "github.com/Azure/application-gateway-kubernetes-ingress/pkg/apis/azureapplicationgatewayrewrite/v1beta1"
prohibitedv1 "github.com/Azure/application-gateway-kubernetes-ingress/pkg/apis/azureingressprohibitedtarget/v1"
multiClusterIngress "github.com/Azure/application-gateway-kubernetes-ingress/pkg/apis/multiclusteringress/v1alpha1"
multiClusterService "github.com/Azure/application-gateway-kubernetes-ingress/pkg/apis/multiclusterservice/v1alpha1"
"github.com/Azure/application-gateway-kubernetes-ingress/pkg/azure"
"github.com/Azure/application-gateway-kubernetes-ingress/pkg/controllererrors"
"github.com/Azure/application-gateway-kubernetes-ingress/pkg/crd_client/agic_crd_client/clientset/versioned"
"github.com/Azure/application-gateway-kubernetes-ingress/pkg/crd_client/agic_crd_client/informers/externalversions"
multicluster_versioned "github.com/Azure/application-gateway-kubernetes-ingress/pkg/crd_client/azure_multicluster_crd_client/clientset/versioned"
multicluster_externalversions "github.com/Azure/application-gateway-kubernetes-ingress/pkg/crd_client/azure_multicluster_crd_client/informers/externalversions"
istio_versioned "github.com/Azure/application-gateway-kubernetes-ingress/pkg/crd_client/istio_crd_client/clientset/versioned"
istio_externalversions "github.com/Azure/application-gateway-kubernetes-ingress/pkg/crd_client/istio_crd_client/informers/externalversions"
"github.com/Azure/application-gateway-kubernetes-ingress/pkg/environment"
"github.com/Azure/application-gateway-kubernetes-ingress/pkg/events"
"github.com/Azure/application-gateway-kubernetes-ingress/pkg/k8scontext/convert"
"github.com/Azure/application-gateway-kubernetes-ingress/pkg/metricstore"
"github.com/Azure/application-gateway-kubernetes-ingress/pkg/sorter"
"github.com/Azure/application-gateway-kubernetes-ingress/pkg/utils"
)
const providerPrefix = "azure://"
const workBuffer = 1024
var namespacesToIgnore = map[string]interface{}{
"kube-system": nil,
"kube-public": nil,
}
// NewContext creates a context based on a Kubernetes client instance.
func NewContext(kubeClient kubernetes.Interface, crdClient versioned.Interface, multiClusterCrdClient multicluster_versioned.Interface, istioCrdClient istio_versioned.Interface, namespaces []string, resyncPeriod time.Duration, metricStore metricstore.MetricStore, envVariables environment.EnvVariables) *Context {
informerFactory := informers.NewSharedInformerFactory(kubeClient, resyncPeriod)
crdInformerFactory := externalversions.NewSharedInformerFactory(crdClient, resyncPeriod)
multiClusterCrdInformerFactory := multicluster_externalversions.NewSharedInformerFactory(multiClusterCrdClient, resyncPeriod)
istioCrdInformerFactory := istio_externalversions.NewSharedInformerFactoryWithOptions(istioCrdClient, resyncPeriod)
informerCollection := InformerCollection{
Endpoints: informerFactory.Core().V1().Endpoints().Informer(),
Pods: informerFactory.Core().V1().Pods().Informer(),
Secret: informerFactory.Core().V1().Secrets().Informer(),
Service: informerFactory.Core().V1().Services().Informer(),
AzureIngressProhibitedTarget: crdInformerFactory.Azureingressprohibitedtargets().V1().AzureIngressProhibitedTargets().Informer(),
AzureApplicationGatewayBackendPool: crdInformerFactory.Azureapplicationgatewaybackendpools().V1beta1().AzureApplicationGatewayBackendPools().Informer(),
AzureApplicationGatewayRewrite: crdInformerFactory.Azureapplicationgatewayrewrites().V1beta1().AzureApplicationGatewayRewrites().Informer(),
AzureApplicationGatewayInstanceUpdateStatus: crdInformerFactory.Azureapplicationgatewayinstanceupdatestatus().V1beta1().AzureApplicationGatewayInstanceUpdateStatuses().Informer(),
MultiClusterService: multiClusterCrdInformerFactory.Multiclusterservices().V1alpha1().MultiClusterServices().Informer(),
MultiClusterIngress: multiClusterCrdInformerFactory.Multiclusteringresses().V1alpha1().MultiClusterIngresses().Informer(),
IstioGateway: istioCrdInformerFactory.Networking().V1alpha3().Gateways().Informer(),
IstioVirtualService: istioCrdInformerFactory.Networking().V1alpha3().VirtualServices().Informer(),
}
if IsNetworkingV1PackageSupported {
informerCollection.Ingress = informerFactory.Networking().V1().Ingresses().Informer()
} else {
informerCollection.Ingress = informerFactory.Extensions().V1beta1().Ingresses().Informer()
}
cacheCollection := CacheCollection{
Endpoints: informerCollection.Endpoints.GetStore(),
Ingress: informerCollection.Ingress.GetStore(),
Pods: informerCollection.Pods.GetStore(),
Secret: informerCollection.Secret.GetStore(),
Service: informerCollection.Service.GetStore(),
AzureIngressProhibitedTarget: informerCollection.AzureIngressProhibitedTarget.GetStore(),
AzureApplicationGatewayBackendPool: informerCollection.AzureApplicationGatewayBackendPool.GetStore(),
AzureApplicationGatewayRewrite: informerCollection.AzureApplicationGatewayRewrite.GetStore(),
AzureApplicationGatewayInstanceUpdateStatus: informerCollection.AzureApplicationGatewayInstanceUpdateStatus.GetStore(),
MultiClusterService: informerCollection.MultiClusterService.GetStore(),
MultiClusterIngress: informerCollection.MultiClusterIngress.GetStore(),
IstioGateway: informerCollection.IstioGateway.GetStore(),
IstioVirtualService: informerCollection.IstioVirtualService.GetStore(),
}
context := &Context{
kubeClient: kubeClient,
crdClient: crdClient,
multiClusterCrdClient: multiClusterCrdClient,
istioCrdClient: istioCrdClient,
informers: &informerCollection,
ingressSecretsMap: utils.NewThreadsafeMultimap(),
Caches: &cacheCollection,
CertificateSecretStore: NewSecretStore(kubeClient),
Work: make(chan events.Event, workBuffer),
CacheSynced: make(chan interface{}),
MetricStore: metricStore,
namespaces: make(map[string]interface{}),
ingressClassControllerName: envVariables.IngressClassControllerName,
ingressClassResourceName: envVariables.IngressClassResourceName,
ingressClassResourceEnabled: envVariables.IngressClassResourceEnabled,
ingressClassResourceDefault: envVariables.IngressClassResourceDefault,
}
for _, ns := range namespaces {
context.namespaces[ns] = nil
}
h := handlers{context}
resourceHandler := cache.ResourceEventHandlerFuncs{
AddFunc: h.addFunc,
UpdateFunc: h.updateFunc,
DeleteFunc: h.deleteFunc,
}
ingressResourceHandler := cache.ResourceEventHandlerFuncs{
AddFunc: h.ingressAdd,
UpdateFunc: h.ingressUpdate,
DeleteFunc: h.ingressDelete,
}
secretResourceHandler := cache.ResourceEventHandlerFuncs{
AddFunc: h.secretAdd,
UpdateFunc: h.secretUpdate,
DeleteFunc: h.secretDelete,
}
// Register event handlers.
informerCollection.Endpoints.AddEventHandler(resourceHandler)
informerCollection.Ingress.AddEventHandler(ingressResourceHandler)
informerCollection.Pods.AddEventHandler(resourceHandler)
informerCollection.Secret.AddEventHandler(secretResourceHandler)
informerCollection.Service.AddEventHandler(resourceHandler)
informerCollection.AzureIngressProhibitedTarget.AddEventHandler(resourceHandler)
informerCollection.AzureApplicationGatewayRewrite.AddEventHandler(resourceHandler)
informerCollection.AzureApplicationGatewayBackendPool.AddEventHandler(resourceHandler)
informerCollection.AzureApplicationGatewayInstanceUpdateStatus.AddEventHandler(resourceHandler)
informerCollection.MultiClusterService.AddEventHandler(resourceHandler)
informerCollection.MultiClusterIngress.AddEventHandler(resourceHandler)
if IsNetworkingV1PackageSupported {
informerCollection.IngressClass = informerFactory.Networking().V1().IngressClasses().Informer()
informerCollection.IngressClass.AddEventHandler(resourceHandler)
cacheCollection.IngressClass = informerCollection.IngressClass.GetStore()
}
return context
}
// Run executes informer collection.
func (c *Context) Run(stopChannel chan struct{}, omitCRDs bool, envVariables environment.EnvVariables) error {
klog.V(1).Infoln("k8s context run started")
var hasSynced []cache.InformerSynced
if c.informers == nil {
e := controllererrors.NewError(
controllererrors.ErrorInformersNotInitialized,
"informers are not initialized",
)
c.MetricStore.IncErrorCount(e.Code)
return e
}
crds := map[cache.SharedInformer]interface{}{
c.informers.AzureIngressProhibitedTarget: nil,
c.informers.IstioGateway: nil,
c.informers.IstioVirtualService: nil,
c.informers.MultiClusterService: nil,
c.informers.MultiClusterIngress: nil,
c.informers.AzureApplicationGatewayRewrite: nil,
// c.informers.AzureApplicationGatewayBackendPool: nil,
// c.informers.AzureApplicationGatewayInstanceUpdateStatus: nil,
}
sharedInformers := []cache.SharedInformer{
c.informers.Endpoints,
c.informers.Pods,
c.informers.Service,
c.informers.Secret,
c.informers.Ingress,
c.informers.AzureApplicationGatewayRewrite,
//TODO: enabled by ccp feature flag
// c.informers.AzureApplicationGatewayBackendPool,
// c.informers.AzureApplicationGatewayInstanceUpdateStatus,
}
if IsNetworkingV1PackageSupported {
sharedInformers = append(sharedInformers, c.informers.IngressClass)
}
// For AGIC to watch for these CRDs the EnableBrownfieldDeploymentVarName env variable must be set to true
if envVariables.EnableBrownfieldDeployment {
sharedInformers = append(sharedInformers, c.informers.AzureIngressProhibitedTarget)
}
// For AGIC to watch for these CRDs the MultiClusterMode env variable must be set to true
if envVariables.MultiClusterMode {
sharedInformers = []cache.SharedInformer{} //only need to monitor 3 resources
sharedInformers = append(sharedInformers, c.informers.MultiClusterIngress)
sharedInformers = append(sharedInformers, c.informers.MultiClusterService)
}
if envVariables.EnableIstioIntegration {
sharedInformers = append(sharedInformers, c.informers.IstioGateway, c.informers.IstioVirtualService)
}
for _, informer := range sharedInformers {
go informer.Run(stopChannel)
// NOTE: Delyan could not figure out how to make informer.HasSynced == true for the CRDs in unit tests
// so until we do that - we omit WaitForCacheSync for CRDs in unit testing
if _, isCRD := crds[informer]; isCRD {
continue
}
hasSynced = append(hasSynced, informer.HasSynced)
}
klog.V(1).Infoln("Waiting for initial cache sync")
if !cache.WaitForCacheSync(stopChannel, hasSynced...) {
e := controllererrors.NewError(
controllererrors.ErrorFailedInitialCacheSync,
"failed initial sync of resources required for ingress",
)
c.MetricStore.IncErrorCount(e.Code)
return e
}
// Closing the cacheSynced channel signals to the rest of the system that... caches have been synced.
close(c.CacheSynced)
klog.V(1).Infoln("Initial cache sync done")
klog.V(1).Infoln("k8s context run finished")
return nil
}
// GetAGICPod returns the pod with specified name and namespace
func (c *Context) GetAGICPod(envVariables environment.EnvVariables) *v1.Pod {
pod, err := c.kubeClient.CoreV1().Pods(envVariables.AGICPodNamespace).Get(context.TODO(), envVariables.AGICPodName, metav1.GetOptions{})
if err != nil {
klog.Error("Error fetching AGIC Pod (This may happen if AGIC is running in a test environment). Error: ", err)
return nil
}
return pod
}
// GetBackendPool returns backend pool with specified name
func (c *Context) GetBackendPool(backendPoolName string) (*agpoolv1beta1.AzureApplicationGatewayBackendPool, error) {
agpool, exist, err := c.Caches.AzureApplicationGatewayBackendPool.GetByKey(backendPoolName)
if !exist {
e := controllererrors.NewErrorf(
controllererrors.ErrorFetchingBackendAddressPool,
"Backend pool CRD object not found for %s",
backendPoolName)
klog.Error(e.Error())
c.MetricStore.IncErrorCount(e.Code)
return nil, e
}
if err != nil {
e := controllererrors.NewErrorWithInnerErrorf(
controllererrors.ErrorFetchingBackendAddressPool,
err,
"Error fetching backend pool CRD object from store for %s",
backendPoolName)
klog.Error(e.Error())
c.MetricStore.IncErrorCount(e.Code)
return nil, e
}
return agpool.(*agpoolv1beta1.AzureApplicationGatewayBackendPool), nil
}
// GetRewriteRuleSetCustomResource returns rewrite with specified name and namespace
func (c *Context) GetRewriteRuleSetCustomResource(namespace string, name string) (*agrewritev1beta1.AzureApplicationGatewayRewrite, error) {
agrewrite, exist, err := c.Caches.AzureApplicationGatewayRewrite.GetByKey(namespace + "/" + name)
if !exist {
e := controllererrors.NewErrorf(
controllererrors.ErrorFetchingRewrite,
"Rewrite rule set custom resource object not found for %s",
name)
klog.Error(e.Error())
c.MetricStore.IncErrorCount(e.Code)
return nil, e
}
if err != nil {
e := controllererrors.NewErrorWithInnerErrorf(
controllererrors.ErrorFetchingRewrite,
err,
"Error fetching rewrite rule set custom resource object from store for %s",
name)
klog.Error(e.Error())
c.MetricStore.IncErrorCount(e.Code)
return nil, e
}
return agrewrite.(*agrewritev1beta1.AzureApplicationGatewayRewrite), nil
}
// GetInstanceUpdateStatus returns update status from when Application Gateway instances update backend pool addresses
func (c *Context) GetInstanceUpdateStatus(instanceUpdateStatusName string) (*aginstv1beta1.AzureApplicationGatewayInstanceUpdateStatus, error) {
agpool, exist, err := c.Caches.AzureApplicationGatewayInstanceUpdateStatus.GetByKey(instanceUpdateStatusName)
if !exist {
e := controllererrors.NewErrorf(
controllererrors.ErrorFetchingInstanceUpdateStatus,
"Instance update status CRD object not found for %s",
instanceUpdateStatusName)
klog.Error(e.Error())
c.MetricStore.IncErrorCount(e.Code)
return nil, e
}
if err != nil {
e := controllererrors.NewErrorWithInnerErrorf(
controllererrors.ErrorFetchingInstanceUpdateStatus,
err,
"Error fetching instance update status CRD object from store for %s",
instanceUpdateStatusName)
klog.Error(e.Error())
c.MetricStore.IncErrorCount(e.Code)
return nil, e
}
return agpool.(*aginstv1beta1.AzureApplicationGatewayInstanceUpdateStatus), nil
}
// GetProhibitedTarget returns prohibited target with specified name and namespace
func (c *Context) GetProhibitedTarget(namespace string, targetName string) *prohibitedv1.AzureIngressProhibitedTarget {
target, err := c.crdClient.AzureingressprohibitedtargetsV1().AzureIngressProhibitedTargets(namespace).Get(context.TODO(), targetName, metav1.GetOptions{})
if err != nil {
klog.Error("Error fetching Azure ingress prohibired target resource, Error: ", err)
return nil
}
return target
}
// ListServices returns a list of all the Services from cache.
func (c *Context) ListServices() []*v1.Service {
var serviceList []*v1.Service
if IsInMultiClusterMode {
for _, multiClusterServiceInterface := range c.Caches.MultiClusterService.List() {
multiClusterService := multiClusterServiceInterface.(*multiClusterService.MultiClusterService)
service, exists := convert.FromMultiClusterService(multiClusterService)
if !exists {
klog.Error("Unable to convert MultiClusterService to Service")
continue
}
if _, exists := c.namespaces[service.Namespace]; len(c.namespaces) > 0 && !exists {
continue
}
if hasTCPPort(service) {
serviceList = append(serviceList, service)
}
}
} else {
for _, serviceInterface := range c.Caches.Service.List() {
service := serviceInterface.(*v1.Service)
if _, exists := c.namespaces[service.Namespace]; len(c.namespaces) > 0 && !exists {
continue
}
if hasTCPPort(service) {
serviceList = append(serviceList, service)
}
}
}
return serviceList
}
// GetEndpointsByService returns the endpoints associated with a specific service.
func (c *Context) GetEndpointsByService(serviceKey string) (*v1.Endpoints, error) {
if IsInMultiClusterMode {
return c.generateEndpointsFromMultiClusterService(serviceKey)
}
endpointsInterface, exist, err := c.Caches.Endpoints.GetByKey(serviceKey)
if !exist {
e := controllererrors.NewErrorf(
controllererrors.ErrorFetchingEndpoints,
"Endpoint not found for %s",
serviceKey)
klog.Error(e.Error())
c.MetricStore.IncErrorCount(e.Code)
return nil, e
}
if err != nil {
e := controllererrors.NewErrorWithInnerErrorf(
controllererrors.ErrorFetchingEndpoints,
err,
"Error fetching endpoints from store for %s",
serviceKey)
klog.Error(e.Error())
c.MetricStore.IncErrorCount(e.Code)
return nil, e
}
return endpointsInterface.(*v1.Endpoints), nil
}
func (c *Context) generateEndpointsFromMultiClusterService(serviceKey string) (*v1.Endpoints, error) {
multiClusterServiceInterface, exist, err := c.Caches.MultiClusterService.GetByKey(serviceKey)
if !exist {
e := controllererrors.NewErrorf(
controllererrors.ErrorFetchingMultiClusterService,
"MultiCluster service not found for %s",
serviceKey)
klog.Error(e.Error())
c.MetricStore.IncErrorCount(e.Code)
return nil, e
}
if err != nil {
e := controllererrors.NewErrorWithInnerErrorf(
controllererrors.ErrorFetchingMultiClusterService,
err,
"Error fetching MultiCluster service from store for %s",
serviceKey)
klog.Error(e.Error())
c.MetricStore.IncErrorCount(e.Code)
return nil, e
}
multiClusterService := multiClusterServiceInterface.(*multiClusterService.MultiClusterService)
endpoints := &v1.Endpoints{}
subset := v1.EndpointSubset{}
for _, multiClusterEndpoint := range multiClusterService.Status.Endpoints {
address := v1.EndpointAddress{IP: multiClusterEndpoint.IP}
subset.Addresses = append(subset.Addresses, address)
}
for _, ports := range multiClusterService.Spec.Ports {
v1Port := v1.EndpointPort{Port: int32(ports.Port), Protocol: v1.Protocol(ports.Protocol)}
subset.Ports = append(subset.Ports, v1Port)
}
endpoints.Subsets = []v1.EndpointSubset{subset}
return endpoints, nil
}
// ListPodsByServiceSelector returns pods that are associated with a specific service.
func (c *Context) ListPodsByServiceSelector(service *v1.Service) []*v1.Pod {
selectorSet := mapset.NewSet()
for k, v := range service.Spec.Selector {
selectorSet.Add(k + ":" + v)
}
var podList []*v1.Pod
for _, podInterface := range c.Caches.Pods.List() {
pod := podInterface.(*v1.Pod)
if _, exists := c.namespaces[pod.Namespace]; len(c.namespaces) > 0 && !exists {
continue
}
podLabelSet := mapset.NewSet()
for k, v := range pod.Labels {
podLabelSet.Add(k + ":" + v)
}
if selectorSet.IsSubset(podLabelSet) && pod.Namespace == service.Namespace {
podList = append(podList, pod)
}
}
return podList
}
// IsPodReferencedByAnyIngress provides whether a POD is useful i.e. a POD is used by an ingress
func (c *Context) IsPodReferencedByAnyIngress(pod *v1.Pod) bool {
// first find all the services
services := c.listServicesByPodSelector(pod)
for _, service := range services {
if c.isServiceReferencedByAnyIngress(service) {
return true
}
}
return false
}
// IsEndpointReferencedByAnyIngress provides whether an Endpoint is useful i.e. a Endpoint is used by an ingress
func (c *Context) IsEndpointReferencedByAnyIngress(endpoints *v1.Endpoints) bool {
service := c.GetService(fmt.Sprintf("%v/%v", endpoints.Namespace, endpoints.Name))
return service != nil && c.isServiceReferencedByAnyIngress(service)
}
// ListHTTPIngresses returns a list of all the ingresses for HTTP from cache.
func (c *Context) ListHTTPIngresses() []*networking.Ingress {
var ingressList []*networking.Ingress
if IsInMultiClusterMode {
klog.V(9).Infof("Fetching MultiCluster Ingresses")
for _, mciInterface := range c.Caches.MultiClusterIngress.List() {
mci := mciInterface.(*multiClusterIngress.MultiClusterIngress)
ingress, exists := convert.FromMultiClusterIngress(mci)
if !exists {
klog.Error("Unable to convert MultiClusterIngress to Ingress")
continue
}
if _, exists := c.namespaces[ingress.Namespace]; len(c.namespaces) > 0 && !exists {
continue
}
ingressList = append(ingressList, ingress)
}
} else {
for _, ingressInterface := range c.Caches.Ingress.List() {
ingress, _ := convert.ToIngressV1(ingressInterface)
if _, exists := c.namespaces[ingress.Namespace]; len(c.namespaces) > 0 && !exists {
continue
}
ingressList = append(ingressList, ingress)
}
}
return c.filterAndSort(ingressList)
}
func (c *Context) filterAndSort(ingList []*networking.Ingress) []*networking.Ingress {
var ingressList []*networking.Ingress
for _, ingress := range ingList {
if !c.IsIngressClass(ingress) {
continue
}
if len(ingress.Spec.Rules) > 0 && !hasHTTPRule(ingress) {
continue
}
ingressList = append(ingressList, ingress)
}
// Sorting the return list ensures that the iterations over this list and
// subsequently created structs have deterministic order. This increases
// cache hits, and lowers the load on ARM.
sort.Sort(sorter.ByIngressName(ingressList))
return ingressList
}
// ListAzureProhibitedTargets returns a list of App Gwy configs, for which AGIC is not allowed to modify config.
func (c *Context) ListAzureProhibitedTargets() []*prohibitedv1.AzureIngressProhibitedTarget {
var targets []*prohibitedv1.AzureIngressProhibitedTarget
for _, obj := range c.Caches.AzureIngressProhibitedTarget.List() {
prohibitedTarget := obj.(*prohibitedv1.AzureIngressProhibitedTarget)
if _, exists := c.namespaces[prohibitedTarget.Namespace]; len(c.namespaces) > 0 && !exists {
continue
}
targets = append(targets, prohibitedTarget)
}
var prohibitedTargets []string
for _, target := range targets {
prohibitedTargets = append(prohibitedTargets, fmt.Sprintf("%s/%s", target.Namespace, target.Name))
}
klog.V(3).Infof("AzureIngressProhibitedTargets: %+v", strings.Join(prohibitedTargets, ","))
return targets
}
// GetService returns the service identified by the key.
func (c *Context) GetService(serviceKey string) *v1.Service {
if IsInMultiClusterMode {
serviceInterface, exist, err := c.Caches.MultiClusterService.GetByKey(serviceKey)
if err != nil {
klog.V(3).Infof("unable to get multicluster service from store, error occurred %s", err)
return nil
}
if !exist {
klog.V(9).Infof("MultiCluster Service %s does not exist", serviceKey)
return nil
}
multiClusterService := serviceInterface.(*multiClusterService.MultiClusterService)
service, exists := convert.FromMultiClusterService(multiClusterService)
if !exists {
klog.V(3).Infof("unable to convert multicluster service from store to service")
return nil
}
return service
}
serviceInterface, exist, err := c.Caches.Service.GetByKey(serviceKey)
if err != nil {
klog.V(3).Infof("unable to get service from store, error occurred %s", err)
return nil
}
if !exist {
klog.V(9).Infof("Service %s does not exist", serviceKey)
return nil
}
service := serviceInterface.(*v1.Service)
return service
}
// GetSecret returns the secret identified by the key
func (c *Context) GetSecret(secretKey string) *v1.Secret {
secretInterface, exist, err := c.Caches.Secret.GetByKey(secretKey)
if err != nil {
klog.Error("Error fetching secret from store:", err)
return nil
}
if !exist {
klog.Error("Error fetching secret from store! Service does not exist:", secretKey)
return nil
}
secret := secretInterface.(*v1.Secret)
return secret
}
// GetVirtualServicesForGateway returns the VirtualServices for the provided gateway
func (c *Context) GetVirtualServicesForGateway(gateway v1alpha3.Gateway) []*v1alpha3.VirtualService {
virtualServices := make([]*v1alpha3.VirtualService, 0)
allVirtualServices := c.ListIstioVirtualServices()
gatewayName := gateway.Name
for _, service := range allVirtualServices {
hasGateway := false
for _, serviceGateway := range service.Spec.Gateways {
if gatewayName == serviceGateway {
hasGateway = true
}
}
if hasGateway {
virtualServices = append(virtualServices, service)
}
}
var virtualServiceLogging []string
for _, virtualService := range virtualServices {
virtualServiceLogging = append(virtualServiceLogging, fmt.Sprintf("%s/%s", virtualService.Namespace, virtualService.Name))
}
klog.V(3).Infof("Found Virtual Services: %+v", strings.Join(virtualServiceLogging, ","))
return virtualServices
}
// GetEndpointsForVirtualService returns a list of Endpoints associated with a Virtual Service
func (c *Context) GetEndpointsForVirtualService(virtualService v1alpha3.VirtualService) v1.EndpointsList {
endpointList := make([]v1.Endpoints, 0)
namespace := virtualService.Namespace
for _, httpRouteRule := range virtualService.Spec.HTTP {
for _, route := range httpRouteRule.Route {
serviceKey := fmt.Sprintf("%v/%v", namespace, route.Destination.Host)
endpoint, err := c.GetEndpointsByService(serviceKey)
if err == nil {
endpointList = append(endpointList, *endpoint)
}
}
}
return v1.EndpointsList{
Items: endpointList,
}
}
// GetGateways returns all Istio Gateways that are annotated.
func (c *Context) GetGateways() []*v1alpha3.Gateway {
annotatedGateways := make([]*v1alpha3.Gateway, 0)
for _, gateway := range c.ListIstioGateways() {
if annotated := c.IsIstioGatewayIngress(gateway); annotated {
annotatedGateways = append(annotatedGateways, gateway)
}
}
return annotatedGateways
}
// GetInfrastructureResourceGroupID returns the subscription and resource group name of the underling infrastructure.
// This uses ProviderID which is ID of the node assigned by the cloud provider in the format: <ProviderName>://<ProviderSpecificNodeID>
func (c *Context) GetInfrastructureResourceGroupID() (azure.SubscriptionID, azure.ResourceGroup, error) {
nodes, err := c.kubeClient.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
if err != nil {
e := controllererrors.NewErrorWithInnerError(
controllererrors.ErrorFetchingNodes,
err,
"no nodes were found in the node list",
)
c.MetricStore.IncErrorCount(e.Code)
return azure.SubscriptionID(""), azure.ResourceGroup(""), e
}
if nodes == nil || len(nodes.Items) == 0 {
e := controllererrors.NewError(
controllererrors.ErrorNoNodesFound,
"no nodes were found in the node list",
)
c.MetricStore.IncErrorCount(e.Code)
return azure.SubscriptionID(""), azure.ResourceGroup(""), e
}
if !strings.HasPrefix(nodes.Items[0].Spec.ProviderID, providerPrefix) {
e := controllererrors.NewError(
controllererrors.ErrorUnrecognizedNodeProviderPrefix,
"providerID is not prefixed with azure://",
)
c.MetricStore.IncErrorCount(e.Code)
return azure.SubscriptionID(""), azure.ResourceGroup(""), e
}
subscriptionID, resourceGroup, _ := azure.ParseResourceID(strings.TrimPrefix(nodes.Items[0].Spec.ProviderID, providerPrefix))
return subscriptionID, resourceGroup, nil
}
// UpdateIngressStatus adds IP address in Ingress Status
func (c *Context) UpdateIngressStatus(ingressToUpdate networking.Ingress, newIP IPAddress) error {
if IsNetworkingV1PackageSupported && !IsInMultiClusterMode {
return c.updateV1IngressStatus(ingressToUpdate, newIP)
} else if IsInMultiClusterMode {
return c.updateMultiClusterIngressStatus(ingressToUpdate, newIP)
} else {
return c.updateV1beta1IngressStatus(ingressToUpdate, newIP)
}
}
func (c *Context) updateV1IngressStatus(ingressToUpdate networking.Ingress, newIP IPAddress) error {
ingressClient := c.kubeClient.NetworkingV1().Ingresses(ingressToUpdate.Namespace)
ingress, err := ingressClient.Get(context.TODO(), ingressToUpdate.Name, metav1.GetOptions{})
if err != nil {
e := controllererrors.NewErrorWithInnerErrorf(
controllererrors.ErrorUpdatingIngressStatus,
err,
"Unable to get ingress %s/%s", ingressToUpdate.Namespace, ingressToUpdate.Name,
)
c.MetricStore.IncErrorCount(e.Code)
return e
}
for _, lbi := range ingress.Status.LoadBalancer.Ingress {
existingIP := lbi.IP
if existingIP == string(newIP) {
klog.Infof("IP %s already set on Ingress %s/%s", lbi.IP, ingress.Namespace, ingress.Name)
return nil
}
}
loadBalancerIngresses := []networking.IngressLoadBalancerIngress{}
if newIP != "" {
loadBalancerIngresses = append(loadBalancerIngresses, networking.IngressLoadBalancerIngress{
IP: string(newIP),
})
}
ingress.Status.LoadBalancer.Ingress = loadBalancerIngresses
if _, err := ingressClient.UpdateStatus(context.TODO(), ingress, metav1.UpdateOptions{}); err != nil {
e := controllererrors.NewErrorWithInnerErrorf(
controllererrors.ErrorUpdatingIngressStatus,
err,
"Unable to update ingress %s/%s status", ingress.Namespace, ingress.Name,
)
c.MetricStore.IncErrorCount(e.Code)
return e
}
return nil
}
func (c *Context) updateV1beta1IngressStatus(ingressToUpdate networking.Ingress, newIP IPAddress) error {
ingressClient := c.kubeClient.ExtensionsV1beta1().Ingresses(ingressToUpdate.Namespace)
ingress, err := ingressClient.Get(context.TODO(), ingressToUpdate.Name, metav1.GetOptions{})
if err != nil {
e := controllererrors.NewErrorWithInnerErrorf(
controllererrors.ErrorUpdatingIngressStatus,
err,
"Unable to get ingress %s/%s", ingressToUpdate.Namespace, ingressToUpdate.Name,
)
c.MetricStore.IncErrorCount(e.Code)
return e
}
for _, lbi := range ingress.Status.LoadBalancer.Ingress {
existingIP := lbi.IP
if existingIP == string(newIP) {
klog.Infof("IP %s already set on Ingress %s/%s", lbi.IP, ingress.Namespace, ingress.Name)
return nil
}
}
loadBalancerIngresses := []extensionsv1beta1.IngressLoadBalancerIngress{}
if newIP != "" {
loadBalancerIngresses = append(loadBalancerIngresses, extensionsv1beta1.IngressLoadBalancerIngress{
IP: string(newIP),
})
}
ingress.Status.LoadBalancer.Ingress = loadBalancerIngresses
if _, err := ingressClient.UpdateStatus(context.TODO(), ingress, metav1.UpdateOptions{}); err != nil {
e := controllererrors.NewErrorWithInnerErrorf(
controllererrors.ErrorUpdatingIngressStatus,
err,
"Unable to update ingress %s/%s status", ingress.Namespace, ingress.Name,
)
c.MetricStore.IncErrorCount(e.Code)
return e
}
return nil
}
func (c *Context) updateMultiClusterIngressStatus(ingressToUpdate networking.Ingress, newIP IPAddress) error {
ingressClient := c.multiClusterCrdClient.MulticlusteringressesV1alpha1().MultiClusterIngresses(ingressToUpdate.Namespace)
ingress, err := ingressClient.Get(context.TODO(), ingressToUpdate.Name, metav1.GetOptions{})
if err != nil {
e := controllererrors.NewErrorWithInnerErrorf(
controllererrors.ErrorUpdatingIngressStatus,
err,
"Unable to get ingress %s/%s", ingressToUpdate.Namespace, ingressToUpdate.Name,
)
c.MetricStore.IncErrorCount(e.Code)
return e
}
for _, lbi := range ingress.Status.LoadBalancer.Ingress {
existingIP := lbi.IP
if existingIP == string(newIP) {
klog.Infof("IP %s already set on Ingress %s/%s", lbi.IP, ingress.Namespace, ingress.Name)
return nil
}
}
loadBalancerIngresses := []networking.IngressLoadBalancerIngress{}
if newIP != "" {
loadBalancerIngresses = append(loadBalancerIngresses, networking.IngressLoadBalancerIngress{
IP: string(newIP),
})
}
ingress.Status.LoadBalancer.Ingress = loadBalancerIngresses
if _, err := ingressClient.UpdateStatus(context.TODO(), ingress, metav1.UpdateOptions{}); err != nil {
e := controllererrors.NewErrorWithInnerErrorf(
controllererrors.ErrorUpdatingIngressStatus,
err,
"Unable to update ingress %s/%s status", ingress.Namespace, ingress.Name,
)
c.MetricStore.IncErrorCount(e.Code)
return e
}
return nil
}
func hasHTTPRule(ingress *networking.Ingress) bool {
for _, rule := range ingress.Spec.Rules {
if rule.HTTP != nil {
return true
}
}
return false
}
func hasTCPPort(service *v1.Service) bool {
for _, port := range service.Spec.Ports {
if port.Protocol == v1.ProtocolTCP {
return true
}
}
return false
}
func (c *Context) listServicesByPodSelector(pod *v1.Pod) []*v1.Service {
labelSet := mapset.NewSet()
for k, v := range pod.Labels {
labelSet.Add(k + ":" + v)
}
var serviceList []*v1.Service
for _, service := range c.ListServices() {
serviceLabelSet := mapset.NewSet()
for k, v := range service.Spec.Selector {
serviceLabelSet.Add(k + ":" + v)
}
if serviceLabelSet.IsSubset(labelSet) {
serviceList = append(serviceList, service)
}
}
return serviceList
}
func (c *Context) isServiceReferencedByAnyIngress(service *v1.Service) bool {
for _, ingress := range c.ListHTTPIngresses() {
for _, rule := range ingress.Spec.Rules {
if rule.HTTP == nil {
continue
}
for _, path := range rule.HTTP.Paths {
// TODO(akshaysngupta) Use service ports
if path.Backend.Service.Name == service.Name {
return true
}
}
}
}
return false
}
func (c *Context) getIngressClassResource(ingressClassName string) *networking.IngressClass {
if class := c.getIngressClassResourceFromCache(ingressClassName); class != nil {
return class
}
return c.getIngressClassResourceFromCluster(ingressClassName)
}
// getIngressClassResource gets ingress class object with specified name
func (c *Context) getIngressClassResourceFromCache(ingressClassName string) *networking.IngressClass {
if c.Caches.IngressClass == nil {
return nil
}
ingressClassInterface, exist, err := c.Caches.IngressClass.GetByKey(ingressClassName)
if err != nil {
klog.Errorf("Unable to fetch IngressClass '%s' from cache. Error: %s", ingressClassName, err)
return nil
}
if !exist {
return nil
}
return ingressClassInterface.(*networking.IngressClass)
}
func (c *Context) getIngressClassResourceFromCluster(ingressClassName string) *networking.IngressClass {
ingressClass, err := c.kubeClient.NetworkingV1().IngressClasses().Get(context.TODO(), ingressClassName, metav1.GetOptions{})
if err != nil {
klog.Errorf("Unable to fetch IngressClass '%s' from cluster. Error: %s", ingressClassName, err)
return nil
}
return ingressClass
}
// IsIngressClass checks if the Ingress resource can be handled by the Application Gateway ingress controller.
func (c *Context) IsIngressClass(ing *networking.Ingress) bool {
// match by annotation (for Backward compatibility)
if className, err := annotations.IngressClass(ing); err == nil && className != "" {
return className == c.ingressClassControllerName
}
// match by ingress class resource
if c.ingressClassResourceEnabled {
// if IngressClassName in ingress that compare it with the controller type
if ing.Spec.IngressClassName != nil {
ingressClass := c.getIngressClassResource(*ing.Spec.IngressClassName)
return ingressClass != nil && ingressClass.Spec.Controller == c.ingressClassControllerName &&
c.ingressClassResourceName == *ing.Spec.IngressClassName
}
// if IngressClassName is nil, then match if AGIC is default ingress
return c.ingressClassResourceDefault
}
return false
}
// IsIstioGatewayIngress checks if this gateway should be handled by AGIC or not
func (c *Context) IsIstioGatewayIngress(gateway *v1alpha3.Gateway) bool {
className, err := annotations.IstioGatewayIngressClass(gateway)
if err != nil {
return false
}
return className == c.ingressClassControllerName
}