pkg/providers/gateway/gateway.go (222 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one or more // contributor license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright ownership. // The ASF licenses this file to You under the Apache License, Version 2.0 // (the "License"); you may not use this file except in compliance with // the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package gateway import ( "context" "fmt" "time" "go.uber.org/zap" k8serrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" gatewayv1beta1 "sigs.k8s.io/gateway-api/apis/v1beta1" "github.com/apache/apisix-ingress-controller/pkg/log" "github.com/apache/apisix-ingress-controller/pkg/providers/utils" "github.com/apache/apisix-ingress-controller/pkg/types" ) type gatewayController struct { controller *Provider workqueue workqueue.RateLimitingInterface workers int } func newGatewayController(c *Provider) *gatewayController { ctl := &gatewayController{ controller: c, workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemFastSlowRateLimiter(1*time.Second, 60*time.Second, 5), "Gateway"), workers: 1, } ctl.controller.gatewayInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: ctl.onAdd, UpdateFunc: ctl.onUpdate, DeleteFunc: ctl.OnDelete, }) return ctl } func (c *gatewayController) run(ctx context.Context) { log.Info("gateway controller started") defer log.Info("gateway controller exited") defer c.workqueue.ShutDown() if !cache.WaitForCacheSync(ctx.Done(), c.controller.gatewayInformer.HasSynced) { log.Error("cache sync failed") return } for i := 0; i < c.workers; i++ { go c.runWorker(ctx) } <-ctx.Done() } func (c *gatewayController) runWorker(ctx context.Context) { for { obj, quit := c.workqueue.Get() if quit { return } err := c.sync(ctx, obj.(*types.Event)) c.workqueue.Done(obj) c.handleSyncErr(obj, err) } } func (c *gatewayController) sync(ctx context.Context, ev *types.Event) error { key := ev.Object.(string) namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { log.Errorw("found Gateway resource with invalid meta namespace key", zap.Error(err), zap.String("key", key), ) return err } gateway, err := c.controller.gatewayLister.Gateways(namespace).Get(name) if err != nil { if !k8serrors.IsNotFound(err) { log.Errorw("failed to get Gateway", zap.Error(err), zap.String("key", key), ) return err } if ev.Type != types.EventDelete { log.Warnw("Gateway was deleted before it can be delivered", zap.String("key", key), ) // Don't need to retry. return nil } } if ev.Type == types.EventDelete { if gateway != nil { // We still find the resource while we are processing the DELETE event, // that means object with same namespace and name was created, discarding // this stale DELETE event. log.Warnw("discard the stale Gateway delete event since it exists", zap.String("key", key), ) return nil } gateway = ev.Tombstone.(*gatewayv1beta1.Gateway) err = c.controller.RemoveListeners(gateway.Namespace, gateway.Name) if err != nil { return err } } else { gatewayClassName := string(gateway.Spec.GatewayClassName) if c.controller.HasGatewayClass(gatewayClassName) { // TODO: handle listeners listeners, err := c.controller.translator.TranslateGatewayV1beta1(gateway) if err != nil { return err } err = c.controller.AddListeners(gateway.Namespace, gateway.Name, listeners) if err != nil { return err } } else { gatewayClass, err := c.controller.gatewayClassLister.Get(gatewayClassName) if err != nil { return err } if gatewayClass.Spec.ControllerName == GatewayClassName { log.Warn("gatewayClass not synced") return fmt.Errorf("wait gatewayClass %s synced", gatewayClassName) } } } // TODO The current implementation does not fully support the definition of Gateway. // We can update `spec.addresses` with the current data plane information. // At present, we choose to directly update `GatewayStatus.Addresses` // to indicate that we have picked the Gateway resource. c.recordStatus(gateway, string(gatewayv1beta1.ListenerReasonReady), metav1.ConditionTrue, gateway.Generation) return nil } func (c *gatewayController) handleSyncErr(obj interface{}, err error) { if err == nil { c.workqueue.Forget(obj) c.controller.MetricsCollector.IncrSyncOperation("gateway", "success") return } event := obj.(*types.Event) if k8serrors.IsNotFound(err) && event.Type != types.EventDelete { log.Infow("sync gateway but not found, ignore", zap.String("event_type", event.Type.String()), zap.String("gateway", event.Object.(string)), ) c.workqueue.Forget(event) return } log.Warnw("sync gateway failed, will retry", zap.Any("object", obj), zap.Error(err), ) c.workqueue.AddRateLimited(obj) c.controller.MetricsCollector.IncrSyncOperation("gateway", "failure") } func (c *gatewayController) onAdd(obj interface{}) { key, err := cache.MetaNamespaceKeyFunc(obj) if err != nil { log.Errorw("found gateway resource with bad meta namespace key", zap.Error(err), zap.Any("obj", obj), ) return } if !c.controller.NamespaceProvider.IsWatchingNamespace(key) { return } log.Debugw("gateway add event arrived", zap.Any("object", obj), ) c.workqueue.Add(&types.Event{ Type: types.EventAdd, Object: key, }) } func (c *gatewayController) onUpdate(oldObj, newObj interface{}) { } func (c *gatewayController) OnDelete(obj interface{}) { key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) if err != nil { log.Errorw("failed to handle deletion Gateway meta key", zap.Error(err), zap.Any("obj", obj), ) return } gateway, ok := obj.(*gatewayv1beta1.Gateway) if !ok { tombstone, ok := obj.(cache.DeletedFinalStateUnknown) if !ok { log.Errorw("Gateway in bad tombstone state", zap.String("key", key), zap.Any("obj", obj), ) return } gateway = tombstone.Obj.(*gatewayv1beta1.Gateway) } c.workqueue.Add(&types.Event{ Type: types.EventDelete, Object: key, Tombstone: gateway, }) } // recordStatus record resources status func (c *gatewayController) recordStatus(v *gatewayv1beta1.Gateway, reason string, status metav1.ConditionStatus, generation int64) { v = v.DeepCopy() gatewayCondition := metav1.Condition{ Type: string(gatewayv1beta1.ListenerConditionReady), Reason: reason, Status: status, Message: "Gateway's status has been successfully updated", ObservedGeneration: generation, } if v.Status.Conditions == nil { conditions := make([]metav1.Condition, 0) v.Status.Conditions = conditions } else { meta.SetStatusCondition(&v.Status.Conditions, gatewayCondition) } lbips, err := utils.IngressLBStatusIPs(c.controller.Cfg.IngressPublishService, c.controller.Cfg.IngressStatusAddress, c.controller.ListerInformer.SvcLister) if err != nil { log.Errorw("failed to get APISIX gateway external IPs", zap.Error(err), ) } v.Status.Addresses = utils.CoreV1ToGatewayV1beta1Addr(lbips) if _, errRecord := c.controller.gatewayClient.GatewayV1beta1().Gateways(v.Namespace).UpdateStatus(context.TODO(), v, metav1.UpdateOptions{}); errRecord != nil { log.Errorw("failed to record status change for Gateway resource", zap.Error(errRecord), zap.String("name", v.Name), zap.String("namespace", v.Namespace), ) } }