pkg/providers/gateway/gateway_tcproute.go (228 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package gateway import ( "context" "time" "go.uber.org/zap" k8serrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" gatewayv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2" "github.com/apache/apisix-ingress-controller/pkg/log" "github.com/apache/apisix-ingress-controller/pkg/providers/translation" "github.com/apache/apisix-ingress-controller/pkg/providers/utils" "github.com/apache/apisix-ingress-controller/pkg/types" ) type gatewayTCPRouteController struct { controller *Provider workqueue workqueue.RateLimitingInterface workers int } func newGatewayTCPRouteController(c *Provider) *gatewayTCPRouteController { ctrl := &gatewayTCPRouteController{ controller: c, workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemFastSlowRateLimiter(1*time.Second, 60*time.Second, 5), "GatewayTCPRoute"), workers: 1, } ctrl.controller.gatewayTCPRouteInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: ctrl.onAdd, UpdateFunc: ctrl.onUpdate, DeleteFunc: ctrl.OnDelete, }) return ctrl } func (c *gatewayTCPRouteController) sync(ctx context.Context, ev *types.Event) error { key := ev.Object.(string) namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { log.Errorw("found Gateway TCPRoute resource with invalid key", zap.Error(err), zap.String("key", key), ) return err } log.Debugw("sync TCPRoute", zap.String("key", key)) tcpRoute, err := c.controller.gatewayTCPRouteLister.TCPRoutes(namespace).Get(name) if err != nil { if !k8serrors.IsNotFound(err) { log.Errorw("failed to get Gateway TCPRoute", zap.Error(err), zap.String("key", key), ) return err } if ev.Type != types.EventDelete { log.Warnw("Gateway TCPRoute was deleted before process", zap.String("key", key), ) // Don't need to retry. return nil } } if ev.Type == types.EventDelete { if tcpRoute != nil { // We still find the resource while we are processing the DELETE event, // that means object with same namespace and name was created, discarding // this stale DELETE event. log.Warnw("discard the stale Gateway/TCPRoute delete event since it exists", zap.String("key", key), ) return nil } tcpRoute = ev.Tombstone.(*gatewayv1alpha2.TCPRoute) } err = c.controller.validator.ValidateCommonRoute(tcpRoute) if err != nil { log.Errorw("failed to validate gateway TCPRoute", zap.Error(err), zap.Any("object", tcpRoute), ) return err } tctx, err := c.controller.translator.TranslateGatewayTCPRouteV1Alpha2(tcpRoute) if err != nil { log.Errorw("failed to translate gateway TCPRoute", zap.Error(err), zap.Any("object", tcpRoute), ) return err } log.Debugw("translated TCPRoute", zap.Any("stream_routes", tctx.StreamRoutes), zap.Any("upstreams", tctx.Upstreams), ) m := &utils.Manifest{ StreamRoutes: tctx.StreamRoutes, Upstreams: tctx.Upstreams, } var ( added *utils.Manifest updated *utils.Manifest deleted *utils.Manifest ) if ev.Type == types.EventDelete { deleted = m } else if ev.Type == types.EventAdd { added = m } else { var oldCtx *translation.TranslateContext oldObj := ev.OldObject.(*gatewayv1alpha2.TCPRoute) oldCtx, _ = c.controller.translator.TranslateGatewayTCPRouteV1Alpha2(oldObj) if oldCtx != nil { om := &utils.Manifest{ StreamRoutes: oldCtx.StreamRoutes, Upstreams: oldCtx.Upstreams, } added, updated, deleted = m.Diff(om) } else { added = m } } return utils.SyncManifests(ctx, c.controller.APISIX, c.controller.APISIXClusterName, added, updated, deleted, false) } func (c *gatewayTCPRouteController) run(ctx context.Context) { log.Info("gateway TCPRoute controller started") defer log.Info("gateway TCPRoute controller exited") defer c.workqueue.ShutDown() if !cache.WaitForCacheSync(ctx.Done(), c.controller.gatewayTCPRouteInformer.HasSynced) { log.Error("sync Gateway TCPRoute cache failed") return } for i := 0; i < c.workers; i++ { go c.runWorker(ctx) } <-ctx.Done() } func (c *gatewayTCPRouteController) runWorker(ctx context.Context) { for { obj, quit := c.workqueue.Get() if quit { return } err := c.sync(ctx, obj.(*types.Event)) c.workqueue.Done(obj) c.handleSyncErr(obj, err) } } func (c *gatewayTCPRouteController) handleSyncErr(obj interface{}, err error) { if err == nil { c.workqueue.Forget(obj) c.controller.MetricsCollector.IncrSyncOperation("gateway_tcproute", "success") return } event := obj.(*types.Event) if k8serrors.IsNotFound(err) && event.Type != types.EventDelete { log.Infow("sync gateway TCPRoute but not found, ignore", zap.String("event_type", event.Type.String()), zap.String("TCPRoute ", event.Object.(string)), ) c.workqueue.Forget(event) return } log.Warnw("sync gateway TCPRoute failed, will retry", zap.Any("object", obj), zap.Error(err), ) c.workqueue.AddRateLimited(obj) c.controller.MetricsCollector.IncrSyncOperation("gateway_tcproute", "failure") } func (c *gatewayTCPRouteController) onAdd(obj interface{}) { key, err := cache.MetaNamespaceKeyFunc(obj) if err != nil { log.Errorw("found gateway TCPRoute resource with bad meta namespace key", zap.Error(err), ) return } if !c.controller.NamespaceProvider.IsWatchingNamespace(key) { return } log.Debugw("gateway TCPRoute add event arrived", zap.Any("object", obj), ) c.workqueue.Add(&types.Event{ Type: types.EventAdd, Object: key, }) } func (c *gatewayTCPRouteController) onUpdate(oldObj, newObj interface{}) { oldTCPRoute := oldObj.(*gatewayv1alpha2.TCPRoute) newTCPRoute := newObj.(*gatewayv1alpha2.TCPRoute) if oldTCPRoute.ResourceVersion >= newTCPRoute.ResourceVersion { return } key, err := cache.MetaNamespaceKeyFunc(oldObj) if err != nil { log.Errorw("found gateway TCPRoute resource with bad meta namespace key", zap.Error(err), ) return } if !c.controller.NamespaceProvider.IsWatchingNamespace(key) { return } log.Debugw("gateway TCPRoute update event arrived", zap.Any("old object", oldObj), zap.Any("new object", newObj), ) c.workqueue.Add(&types.Event{ Type: types.EventUpdate, Object: key, OldObject: oldTCPRoute, }) } func (c *gatewayTCPRouteController) OnDelete(obj interface{}) { key, err := cache.MetaNamespaceKeyFunc(obj) if err != nil { log.Errorw("found gateway TCPRoute resource with bad meta namespace key", zap.Error(err), ) return } if !c.controller.NamespaceProvider.IsWatchingNamespace(key) { return } log.Debugw("gateway TCPRoute delete event arrived", zap.Any("object", obj), ) c.workqueue.Add(&types.Event{ Type: types.EventDelete, Object: key, Tombstone: obj, }) }