pkg/providers/k8s/configmap/configmap.go (251 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one or more // contributor license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright ownership. // The ASF licenses this file to You under the Apache License, Version 2.0 // (the "License"); you may not use this file except in compliance with // the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package configmap import ( "context" "time" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" "github.com/apache/apisix-ingress-controller/pkg/log" "github.com/apache/apisix-ingress-controller/pkg/providers/k8s/configmap/translation" providertypes "github.com/apache/apisix-ingress-controller/pkg/providers/types" "github.com/apache/apisix-ingress-controller/pkg/providers/utils" "github.com/apache/apisix-ingress-controller/pkg/types" ) type subscripKey struct { namespace string name string } type configmapController struct { *providertypes.Common workqueue workqueue.RateLimitingInterface workers int subscriptionList map[subscripKey]struct{} } func newConfigMapController(common *providertypes.Common) *configmapController { ctl := &configmapController{ workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemFastSlowRateLimiter(1*time.Second, 60*time.Second, 5), "ConfigMap"), workers: 1, subscriptionList: map[subscripKey]struct{}{}, Common: common, } ctl.ConfigMapInformer.AddEventHandler( cache.ResourceEventHandlerFuncs{ AddFunc: ctl.onAdd, UpdateFunc: ctl.onUpdate, DeleteFunc: ctl.onDelete, }, ) return ctl } func (c *configmapController) Subscribe(namespace, configName string) { c.subscriptionList[subscripKey{ namespace: namespace, name: configName, }] = struct{}{} } func (c *configmapController) IsSubscribing(key string) bool { namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { return false } _, ok := c.subscriptionList[subscripKey{ namespace: namespace, name: name, }] return ok } func (c *configmapController) run(ctx context.Context) { log.Info("configmap controller started") defer log.Info("configmap controller exited") for i := 0; i < c.workers; i++ { go c.runWorker(ctx) } <-ctx.Done() } func (c *configmapController) runWorker(ctx context.Context) { for { obj, quit := c.workqueue.Get() if quit { return } err := c.sync(ctx, obj.(*types.Event)) c.workqueue.Done(obj) c.handleSyncErr(obj.(*types.Event), err) } } func (c *configmapController) sync(ctx context.Context, ev *types.Event) error { key := ev.Object.(string) log.Debugw("configmap sync event arrived", zap.String("event_type", ev.Type.String()), zap.Any("key", ev.Object), ) namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { log.Errorf("invalid resource key: %s", key) return err } cm, err := c.ConfigMapLister.ConfigMaps(namespace).Get(name) if err != nil { if !k8serrors.IsNotFound(err) { log.Errorw("sync failed, unable to get ConfigMap", zap.String("key", key), zap.Error(err), ) return err } if ev.Type != types.EventDelete { log.Warnw("configmap was deleted before it can be delivered", zap.String("key", key), ) return nil } cm = ev.Tombstone.(*corev1.ConfigMap) } var ( configmap *translation.ConfigMap oldConfigmap *translation.ConfigMap ) configmap, err = translation.TranslateConfigMap(cm) if err != nil { return err } if ev.Type == types.EventUpdate { oldConfigmap, _ = translation.TranslateConfigMap(ev.OldObject.(*corev1.ConfigMap)) } for clusterName, pluginMetadatas := range configmap.ConfigYaml.Data { m := &utils.Manifest{ PluginMetadatas: pluginMetadatas, } var ( added *utils.Manifest updated *utils.Manifest deleted *utils.Manifest ) if ev.Type == types.EventDelete { deleted = m } else if ev.Type == types.EventAdd { added = m } else { if oldConfigmap != nil { oldPluginMetadatas := oldConfigmap.ConfigYaml.Data[clusterName] om := &utils.Manifest{ PluginMetadatas: oldPluginMetadatas, } added, updated, deleted = m.Diff(om) } } log.Debugw("sync ApisixGlobalRule to cluster", zap.String("event_type", ev.Type.String()), zap.Any("add", added), zap.Any("update", updated), zap.Any("delete", deleted), ) if err := c.SyncClusterManifests(ctx, clusterName, added, updated, deleted, false); err != nil { log.Errorw("sync cluster failed", zap.Error(err)) return err } } if ev.Type == types.EventUpdate && oldConfigmap != nil { if oldConfigmap == nil { return nil } for clusterName, pluginMetadatas := range oldConfigmap.ConfigYaml.Data { if _, ok := configmap.ConfigYaml.Data[clusterName]; !ok { deleted := &utils.Manifest{ PluginMetadatas: pluginMetadatas, } log.Debugw("sync configmap to cluster", zap.String("event_type", ev.Type.String()), zap.Any("delete", deleted), ) if err := c.SyncClusterManifests(ctx, clusterName, nil, nil, deleted, false); err != nil { log.Errorw("sync cluster failed", zap.Error(err)) } } } } return nil } func (c *configmapController) handleSyncErr(event *types.Event, err error) { key := event.Object.(string) if err != nil { if k8serrors.IsNotFound(err) && event.Type != types.EventDelete { log.Infow("sync configmap but not found, ignore", zap.String("event_type", event.Type.String()), zap.Any("key", key), ) c.workqueue.Forget(event) return } log.Warnw("sync configmap info failed, will retry", zap.String("key", key), zap.Error(err), ) c.workqueue.AddRateLimited(event) } else { c.workqueue.Forget(event) } } func (c *configmapController) onAdd(obj interface{}) { key, err := cache.MetaNamespaceKeyFunc(obj) if err != nil { log.Errorf("found ConfigMap resource with error: %v", err) return } if !c.IsSubscribing(key) { return } log.Debugw("configmap add event arrived", zap.String("key", key), zap.Any("object", obj), ) c.workqueue.Add(&types.Event{ Type: types.EventAdd, Object: key, }) } func (c *configmapController) onUpdate(pre, cur interface{}) { old := pre.(*corev1.ConfigMap) new := cur.(*corev1.ConfigMap) if old.ResourceVersion >= new.ResourceVersion { return } key, err := cache.MetaNamespaceKeyFunc(cur) if err != nil { log.Errorf("found ConfigMap resource with error: %v", err) return } if !c.IsSubscribing(key) { return } log.Debugw("configmap update event arrived", zap.String("key", key), zap.Any("object", new), ) c.workqueue.Add(&types.Event{ Type: types.EventUpdate, Object: key, OldObject: old, }) } func (c *configmapController) onDelete(obj interface{}) { key, err := cache.MetaNamespaceKeyFunc(obj) if err != nil { log.Errorf("found ConfigMap resource with error: %v", err) return } if !c.IsSubscribing(key) { return } log.Debugw("configmap delete event arrived", zap.String("key", key), zap.Any("object", obj), ) c.workqueue.Add(&types.Event{ Type: types.EventDelete, Object: key, Tombstone: obj, }) }