pkg/providers/controller.go (475 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one or more // contributor license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright ownership. // The ASF licenses this file to You under the Apache License, Version 2.0 // (the "License"); you may not use this file except in compliance with // the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package providers import ( "context" "fmt" "os" "time" "go.uber.org/zap" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/kubernetes/scheme" typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" networkingv1 "k8s.io/client-go/listers/networking/v1" networkingv1beta1 "k8s.io/client-go/listers/networking/v1beta1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/leaderelection" "k8s.io/client-go/tools/leaderelection/resourcelock" "k8s.io/client-go/tools/record" "github.com/apache/apisix-ingress-controller/pkg/api" "github.com/apache/apisix-ingress-controller/pkg/apisix" "github.com/apache/apisix-ingress-controller/pkg/config" "github.com/apache/apisix-ingress-controller/pkg/kube" apisixscheme "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/client/clientset/versioned/scheme" v2 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/client/listers/config/v2" "github.com/apache/apisix-ingress-controller/pkg/log" "github.com/apache/apisix-ingress-controller/pkg/metrics" apisixprovider "github.com/apache/apisix-ingress-controller/pkg/providers/apisix" apisixtranslation "github.com/apache/apisix-ingress-controller/pkg/providers/apisix/translation" "github.com/apache/apisix-ingress-controller/pkg/providers/gateway" ingressprovider "github.com/apache/apisix-ingress-controller/pkg/providers/ingress" "github.com/apache/apisix-ingress-controller/pkg/providers/k8s" "github.com/apache/apisix-ingress-controller/pkg/providers/k8s/namespace" "github.com/apache/apisix-ingress-controller/pkg/providers/k8s/pod" "github.com/apache/apisix-ingress-controller/pkg/providers/translation" providertypes "github.com/apache/apisix-ingress-controller/pkg/providers/types" "github.com/apache/apisix-ingress-controller/pkg/providers/utils" ) const ( // _component is used for event component _component = "ApisixIngress" // minimum interval for ingress sync to APISIX _minimumApisixResourceSyncInterval = 60 * time.Second ) // Controller is the ingress apisix controller object. type Controller struct { name string namespace string cfg *config.Config apisix apisix.APISIX apiServer *api.Server MetricsCollector metrics.Collector kubeClient *kube.KubeClient // recorder event recorder record.EventRecorder // leaderContextCancelFunc will be called when apisix-ingress-controller // decides to give up its leader role. leaderContextCancelFunc context.CancelFunc translator translation.Translator apisixTranslator apisixtranslation.ApisixTranslator informers *providertypes.ListerInformer namespaceProvider namespace.WatchingNamespaceProvider podProvider pod.Provider kubeProvider k8s.Provider gatewayProvider *gateway.Provider apisixProvider apisixprovider.Provider ingressProvider ingressprovider.Provider } // NewController creates an ingress apisix controller object. func NewController(cfg *config.Config) (*Controller, error) { podName := os.Getenv("POD_NAME") podNamespace := os.Getenv("POD_NAMESPACE") if podNamespace == "" { podNamespace = "default" } client, err := apisix.NewClient(cfg.APISIX.AdminAPIVersion) if err != nil { return nil, err } kubeClient, err := kube.NewKubeClient(cfg) if err != nil { return nil, err } apiSrv, err := api.NewServer(cfg) if err != nil { return nil, err } // recorder utilruntime.Must(apisixscheme.AddToScheme(scheme.Scheme)) eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeClient.Client.CoreV1().Events("")}) c := &Controller{ name: podName, namespace: podNamespace, cfg: cfg, apiServer: apiSrv, apisix: client, MetricsCollector: metrics.NewPrometheusCollector(), kubeClient: kubeClient, recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: _component}), } return c, nil } // Eventf implements the resourcelock.EventRecorder interface. func (c *Controller) Eventf(_ runtime.Object, eventType string, reason string, message string, _ ...interface{}) { log.Infow(reason, zap.String("message", message), zap.String("event_type", eventType)) } // Run launches the controller. func (c *Controller) Run(stop chan struct{}) error { rootCtx, rootCancel := context.WithCancel(context.Background()) defer rootCancel() go func() { <-stop rootCancel() }() c.MetricsCollector.ResetLeader(false) go func() { log.Info("start api server") if err := c.apiServer.Run(rootCtx.Done()); err != nil { log.Errorf("failed to launch API Server: %s", err) } }() lock := &resourcelock.LeaseLock{ LeaseMeta: metav1.ObjectMeta{ Namespace: c.namespace, Name: c.cfg.Kubernetes.ElectionID, }, Client: c.kubeClient.Client.CoordinationV1(), LockConfig: resourcelock.ResourceLockConfig{ Identity: c.name, EventRecorder: c, }, } cfg := leaderelection.LeaderElectionConfig{ Lock: lock, LeaseDuration: 15 * time.Second, RenewDeadline: 5 * time.Second, RetryPeriod: 2 * time.Second, Callbacks: leaderelection.LeaderCallbacks{ OnStartedLeading: c.run, OnNewLeader: func(identity string) { log.Warnf("found a new leader %s", identity) if identity != c.name { log.Infow("controller now is running as a candidate", zap.String("namespace", c.namespace), zap.String("pod", c.name), ) c.MetricsCollector.ResetLeader(false) // delete the old APISIX cluster, so that the cached state // like synchronization won't be used next time the candidate // becomes the leader again. c.apisix.DeleteCluster(c.cfg.APISIX.DefaultClusterName) } }, OnStoppedLeading: func() { log.Infow("controller now is running as a candidate", zap.String("namespace", c.namespace), zap.String("pod", c.name), ) c.MetricsCollector.ResetLeader(false) // delete the old APISIX cluster, so that the cached state // like synchronization won't be used next time the candidate // becomes the leader again. c.apisix.DeleteCluster(c.cfg.APISIX.DefaultClusterName) }, }, ReleaseOnCancel: true, Name: "ingress-apisix", } elector, err := leaderelection.NewLeaderElector(cfg) if err != nil { log.Errorf("failed to create leader elector: %s", err.Error()) return err } election: curCtx, cancel := context.WithCancel(rootCtx) c.leaderContextCancelFunc = cancel elector.Run(curCtx) select { case <-rootCtx.Done(): return nil default: goto election } } func (c *Controller) initSharedInformers() *providertypes.ListerInformer { kubeFactory := c.kubeClient.NewSharedIndexInformerFactory() apisixFactory := c.kubeClient.NewAPISIXSharedIndexInformerFactory() var ( ingressInformer cache.SharedIndexInformer ingressListerV1 networkingv1.IngressLister ingressListerV1beta1 networkingv1beta1.IngressLister ) var ( apisixUpstreamInformer cache.SharedIndexInformer apisixRouteInformer cache.SharedIndexInformer apisixPluginConfigInformer cache.SharedIndexInformer apisixConsumerInformer cache.SharedIndexInformer apisixTlsInformer cache.SharedIndexInformer apisixClusterConfigInformer cache.SharedIndexInformer ApisixGlobalRuleInformer cache.SharedIndexInformer apisixRouteListerV2 v2.ApisixRouteLister apisixUpstreamListerV2 v2.ApisixUpstreamLister apisixTlsListerV2 v2.ApisixTlsLister apisixClusterConfigListerV2 v2.ApisixClusterConfigLister apisixConsumerListerV2 v2.ApisixConsumerLister apisixPluginConfigListerV2 v2.ApisixPluginConfigLister ApisixGlobalRuleListerV2 v2.ApisixGlobalRuleLister ) switch c.cfg.Kubernetes.APIVersion { case config.ApisixV2: apisixRouteInformer = apisixFactory.Apisix().V2().ApisixRoutes().Informer() apisixTlsInformer = apisixFactory.Apisix().V2().ApisixTlses().Informer() apisixClusterConfigInformer = apisixFactory.Apisix().V2().ApisixClusterConfigs().Informer() apisixConsumerInformer = apisixFactory.Apisix().V2().ApisixConsumers().Informer() apisixPluginConfigInformer = apisixFactory.Apisix().V2().ApisixPluginConfigs().Informer() apisixUpstreamInformer = apisixFactory.Apisix().V2().ApisixUpstreams().Informer() ApisixGlobalRuleInformer = apisixFactory.Apisix().V2().ApisixGlobalRules().Informer() apisixRouteListerV2 = apisixFactory.Apisix().V2().ApisixRoutes().Lister() apisixUpstreamListerV2 = apisixFactory.Apisix().V2().ApisixUpstreams().Lister() apisixTlsListerV2 = apisixFactory.Apisix().V2().ApisixTlses().Lister() apisixClusterConfigListerV2 = apisixFactory.Apisix().V2().ApisixClusterConfigs().Lister() apisixConsumerListerV2 = apisixFactory.Apisix().V2().ApisixConsumers().Lister() apisixPluginConfigListerV2 = apisixFactory.Apisix().V2().ApisixPluginConfigs().Lister() ApisixGlobalRuleListerV2 = apisixFactory.Apisix().V2().ApisixGlobalRules().Lister() default: panic(fmt.Errorf("unsupported API version %v", c.cfg.Kubernetes.APIVersion)) } apisixUpstreamLister := kube.NewApisixUpstreamLister(apisixUpstreamListerV2) apisixRouteLister := kube.NewApisixRouteLister(apisixRouteListerV2) apisixTlsLister := kube.NewApisixTlsLister(apisixTlsListerV2) apisixClusterConfigLister := kube.NewApisixClusterConfigLister(apisixClusterConfigListerV2) apisixConsumerLister := kube.NewApisixConsumerLister(apisixConsumerListerV2) apisixPluginConfigLister := kube.NewApisixPluginConfigLister(apisixPluginConfigListerV2) ApisixGlobalRuleLister := kube.NewApisixGlobalRuleLister(c.cfg.Kubernetes.APIVersion, ApisixGlobalRuleListerV2) epLister, epInformer := kube.NewEndpointListerAndInformer(kubeFactory, c.cfg.Kubernetes.WatchEndpointSlices) svcInformer := kubeFactory.Core().V1().Services().Informer() svcLister := kubeFactory.Core().V1().Services().Lister() podInformer := kubeFactory.Core().V1().Pods().Informer() podLister := kubeFactory.Core().V1().Pods().Lister() secretInformer := kubeFactory.Core().V1().Secrets().Informer() secretLister := kubeFactory.Core().V1().Secrets().Lister() configmapInformer := kubeFactory.Core().V1().ConfigMaps().Informer() configmapLister := kubeFactory.Core().V1().ConfigMaps().Lister() switch c.cfg.Kubernetes.IngressVersion { case config.IngressNetworkingV1beta1: ingressInformer = kubeFactory.Networking().V1beta1().Ingresses().Informer() ingressListerV1beta1 = kubeFactory.Networking().V1beta1().Ingresses().Lister() default: ingressInformer = kubeFactory.Networking().V1().Ingresses().Informer() ingressListerV1 = kubeFactory.Networking().V1().Ingresses().Lister() } ingressLister := kube.NewIngressLister(ingressListerV1, ingressListerV1beta1) listerInformer := &providertypes.ListerInformer{ ApisixFactory: apisixFactory, KubeFactory: kubeFactory, EpLister: epLister, EpInformer: epInformer, SvcLister: svcLister, SvcInformer: svcInformer, SecretLister: secretLister, SecretInformer: secretInformer, PodLister: podLister, PodInformer: podInformer, ConfigMapInformer: configmapInformer, ConfigMapLister: configmapLister, IngressInformer: ingressInformer, IngressLister: ingressLister, ApisixUpstreamLister: apisixUpstreamLister, ApisixRouteLister: apisixRouteLister, ApisixConsumerLister: apisixConsumerLister, ApisixTlsLister: apisixTlsLister, ApisixPluginConfigLister: apisixPluginConfigLister, ApisixClusterConfigLister: apisixClusterConfigLister, ApisixGlobalRuleLister: ApisixGlobalRuleLister, ApisixUpstreamInformer: apisixUpstreamInformer, ApisixPluginConfigInformer: apisixPluginConfigInformer, ApisixRouteInformer: apisixRouteInformer, ApisixClusterConfigInformer: apisixClusterConfigInformer, ApisixConsumerInformer: apisixConsumerInformer, ApisixTlsInformer: apisixTlsInformer, ApisixGlobalRuleInformer: ApisixGlobalRuleInformer, } return listerInformer } func (c *Controller) run(ctx context.Context) { log.Infow("controller tries to leading ...", zap.String("namespace", c.namespace), zap.String("pod", c.name), ) var cancelFunc context.CancelFunc ctx, cancelFunc = context.WithCancel(ctx) defer cancelFunc() // give up leader defer c.leaderContextCancelFunc() clusterOpts := &apisix.ClusterOptions{ AdminAPIVersion: c.cfg.APISIX.AdminAPIVersion, Name: c.cfg.APISIX.DefaultClusterName, AdminKey: c.cfg.APISIX.DefaultClusterAdminKey, BaseURL: c.cfg.APISIX.DefaultClusterBaseURL, MetricsCollector: c.MetricsCollector, SyncComparison: c.cfg.ApisixResourceSyncComparison, } err := c.apisix.AddCluster(ctx, clusterOpts) if err != nil && err != apisix.ErrDuplicatedCluster { // TODO give up the leader role log.Errorf("failed to add default cluster: %s", err) return } if err := c.apisix.Cluster(c.cfg.APISIX.DefaultClusterName).HasSynced(ctx); err != nil { // TODO give up the leader role log.Errorf("failed to wait the default cluster to be ready: %s", err) // re-create apisix cluster, used in next c.run if err = c.apisix.UpdateCluster(ctx, clusterOpts); err != nil { log.Errorf("failed to update default cluster: %s", err) return } return } // Creation Phase log.Info("creating controller") c.informers = c.initSharedInformers() common := &providertypes.Common{ ControllerNamespace: c.namespace, ListerInformer: c.informers, Config: c.cfg, APISIX: c.apisix, KubeClient: c.kubeClient, MetricsCollector: c.MetricsCollector, Recorder: c.recorder, } c.namespaceProvider, err = namespace.NewWatchingNamespaceProvider(ctx, c.kubeClient, c.cfg) if err != nil { ctx.Done() return } c.podProvider, err = pod.NewProvider(common, c.namespaceProvider) if err != nil { ctx.Done() return } c.translator = translation.NewTranslator(&translation.TranslatorOptions{ APIVersion: c.cfg.Kubernetes.APIVersion, EndpointLister: c.informers.EpLister, ServiceLister: c.informers.SvcLister, SecretLister: c.informers.SecretLister, PodLister: c.informers.PodLister, ApisixUpstreamLister: c.informers.ApisixUpstreamLister, PodProvider: c.podProvider, IngressClassName: c.cfg.Kubernetes.IngressClass, }) c.apisixProvider, c.apisixTranslator, err = apisixprovider.NewProvider(common, c.namespaceProvider, c.translator) if err != nil { ctx.Done() return } c.ingressProvider, err = ingressprovider.NewProvider(common, c.namespaceProvider, c.translator, c.apisixTranslator) if err != nil { ctx.Done() return } c.kubeProvider, err = k8s.NewProvider(common, c.translator, c.namespaceProvider, c.apisixProvider, c.ingressProvider) if err != nil { ctx.Done() return } if c.cfg.Kubernetes.EnableGatewayAPI { c.gatewayProvider, err = gateway.NewGatewayProvider(&gateway.ProviderOptions{ Cfg: c.cfg, APISIX: c.apisix, APISIXClusterName: c.cfg.APISIX.DefaultClusterName, KubeTranslator: c.translator, RestConfig: nil, KubeClient: c.kubeClient.Client, MetricsCollector: c.MetricsCollector, NamespaceProvider: c.namespaceProvider, ListerInformer: common.ListerInformer, }) if err != nil { ctx.Done() return } } // Init Phase log.Info("init namespaces") if err = c.namespaceProvider.Init(ctx); err != nil { ctx.Done() return } log.Info("wait for resource sync") // Wait for resource sync if ok := c.informers.StartAndWaitForCacheSync(ctx); !ok { ctx.Done() return } log.Info("init providers") // Compare resource if err = c.apisixProvider.Init(ctx); err != nil { ctx.Done() return } // Run Phase log.Info("try to run providers") e := utils.ParallelExecutor{} e.Add(func() { c.checkClusterHealth(ctx, cancelFunc) }) e.Add(func() { c.namespaceProvider.Run(ctx) }) e.Add(func() { c.kubeProvider.Run(ctx) }) e.Add(func() { c.apisixProvider.Run(ctx) }) e.Add(func() { c.ingressProvider.Run(ctx) }) if c.cfg.Kubernetes.EnableGatewayAPI { e.Add(func() { c.gatewayProvider.Run(ctx) }) } e.Add(func() { c.resourceSyncLoop(ctx, c.cfg.ApisixResourceSyncInterval.Duration) }) c.MetricsCollector.ResetLeader(true) log.Infow("controller now is running as leader", zap.String("namespace", c.namespace), zap.String("pod", c.name), ) <-ctx.Done() e.Wait() for _, execErr := range e.Errors() { log.Error(execErr.Error()) } if len(e.Errors()) > 0 { log.Error("Start failed, abort...") cancelFunc() } } func (c *Controller) checkClusterHealth(ctx context.Context, cancelFunc context.CancelFunc) { defer cancelFunc() t := time.NewTicker(5 * time.Second) defer t.Stop() for { select { case <-ctx.Done(): return case <-t.C: } err := c.apisix.Cluster(c.cfg.APISIX.DefaultClusterName).HealthCheck(ctx) if err != nil { c.apiServer.HealthState.Lock() c.apiServer.HealthState.Err = err c.apiServer.HealthState.Unlock() // Finally failed health check, then give up leader. log.Warnf("failed to check health for default cluster: %s, give up leader", err) } else { if c.apiServer.HealthState.Err != nil { c.apiServer.HealthState.Lock() c.apiServer.HealthState.Err = err c.apiServer.HealthState.Unlock() } log.Debugf("success check health for default cluster") c.MetricsCollector.IncrCheckClusterHealth(c.name) } } } func (c *Controller) syncAllResources(interval time.Duration) { e := utils.ParallelExecutor{} e.Add(c.ingressProvider.ResourceSync) e.Add(func() { c.apisixProvider.ResourceSync(interval) }) e.Wait() } func (c *Controller) resourceSyncLoop(ctx context.Context, interval time.Duration) { if interval == 0 { log.Info("apisix-resource-sync-interval set to 0, periodically synchronization disabled.") return } // The interval shall not be less than 60 seconds. if interval < _minimumApisixResourceSyncInterval { log.Warnw("The apisix-resource-sync-interval shall not be less than 60 seconds.", zap.String("apisix-resource-sync-interval", interval.String()), ) interval = _minimumApisixResourceSyncInterval } ticker := time.NewTicker(interval) defer ticker.Stop() for { select { case <-ticker.C: c.syncAllResources(interval) continue case <-ctx.Done(): return } } }