pkg/providers/k8s/namespace/namespace.go (149 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package namespace
import (
"context"
"time"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"github.com/apache/apisix-ingress-controller/pkg/log"
"github.com/apache/apisix-ingress-controller/pkg/types"
)
// FIXME: Controller should be the Core Part,
// Provider should act as "EventHandler", register there functions to Controller
type EventHandler interface {
OnAdd()
OnUpdate()
OnDelete()
}
type namespaceController struct {
syncCh chan string
controller *watchingProvider
workqueue workqueue.RateLimitingInterface
workers int
}
func newNamespaceController(c *watchingProvider, syncCh chan string) *namespaceController {
ctl := &namespaceController{
syncCh: syncCh,
controller: c,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemFastSlowRateLimiter(1*time.Second, 60*time.Second, 5), "Namespace"),
workers: 1,
}
ctl.controller.namespaceInformer.AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: ctl.onAdd,
UpdateFunc: ctl.onUpdate,
DeleteFunc: ctl.onDelete,
},
)
return ctl
}
func (c *namespaceController) run(ctx context.Context) {
if ok := cache.WaitForCacheSync(ctx.Done(), c.controller.namespaceInformer.HasSynced); !ok {
log.Error("namespace informers sync failed")
return
}
log.Info("namespace controller started")
defer log.Info("namespace controller exited")
for i := 0; i < c.workers; i++ {
go c.runWorker(ctx)
}
<-ctx.Done()
}
func (c *namespaceController) runWorker(ctx context.Context) {
for {
obj, quit := c.workqueue.Get()
if quit {
return
}
err := c.sync(ctx, obj.(*types.Event))
c.workqueue.Done(obj)
c.handleSyncErr(obj.(*types.Event), err)
}
}
func (c *namespaceController) sync(ctx context.Context, ev *types.Event) error {
if ev.Type != types.EventDelete {
// check the labels of specify namespace
namespace, err := c.controller.kube.Client.CoreV1().Namespaces().Get(ctx, ev.Object.(string), metav1.GetOptions{})
if err != nil {
return err
}
// if labels of namespace contains the watchingLabels, the namespace should be set to controller.watchingNamespaces
if c.controller.watchingMultiValuedLabels.IsSubsetOf(namespace.Labels) {
log.Infow("watching namespace", zap.String("name", namespace.Name))
if _, ok := c.controller.watchingNamespaces.Load(namespace.Name); !ok {
c.controller.watchingNamespaces.Store(namespace.Name, struct{}{})
if c.syncCh != nil {
log.Infof("resync resource in namespace %s", namespace.Name)
c.syncCh <- namespace.Name
}
}
} else {
log.Infow("un-watching namespace", zap.String("name", namespace.Name))
c.controller.watchingNamespaces.Delete(namespace.Name)
}
} else { // type == types.EventDelete
namespace := ev.Tombstone.(*corev1.Namespace)
if _, ok := c.controller.watchingNamespaces.Load(namespace.Name); ok {
log.Infow("un-watching namespace", zap.String("name", namespace.Name))
c.controller.watchingNamespaces.Delete(namespace.Name)
}
// do nothing, if the namespace did not in controller.watchingNamespaces
}
return nil
}
func (c *namespaceController) handleSyncErr(event *types.Event, err error) {
name := event.Object.(string)
if err != nil {
if k8serrors.IsNotFound(err) && event.Type != types.EventDelete {
log.Infow("sync namespace but not found, ignore",
zap.String("event_type", event.Type.String()),
zap.String("namespace", event.Object.(string)),
)
c.workqueue.Forget(event)
return
}
log.Warnw("sync namespace info failed, will retry",
zap.String("namespace", name),
zap.Error(err),
)
c.workqueue.AddRateLimited(event)
} else {
c.workqueue.Forget(event)
}
}
func (c *namespaceController) onAdd(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err != nil {
log.Errorf("found Namespace resource with error: %v", err)
return
}
log.Debugw("namespace add event arrived",
zap.Any("namespace", obj),
)
c.workqueue.Add(&types.Event{
Type: types.EventAdd,
Object: key,
})
}
func (c *namespaceController) onUpdate(pre, cur interface{}) {
oldNamespace := pre.(*corev1.Namespace)
newNamespace := cur.(*corev1.Namespace)
if oldNamespace.ResourceVersion >= newNamespace.ResourceVersion {
return
}
key, err := cache.MetaNamespaceKeyFunc(cur)
if err != nil {
log.Errorf("found Namespace resource with error: %v", err)
return
}
c.workqueue.Add(&types.Event{
Type: types.EventUpdate,
Object: key,
})
}
func (c *namespaceController) onDelete(obj interface{}) {
namespace := obj.(*corev1.Namespace)
c.workqueue.Add(&types.Event{
Type: types.EventDelete,
Object: namespace.Name,
Tombstone: namespace,
})
}