pkg/providers/k8s/namespace/namespace_provider.go (140 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package namespace
import (
"context"
"fmt"
"strings"
"sync"
"go.uber.org/zap"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
listerscorev1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"github.com/apache/apisix-ingress-controller/pkg/config"
"github.com/apache/apisix-ingress-controller/pkg/kube"
"github.com/apache/apisix-ingress-controller/pkg/log"
provider "github.com/apache/apisix-ingress-controller/pkg/providers/types"
"github.com/apache/apisix-ingress-controller/pkg/providers/utils"
"github.com/apache/apisix-ingress-controller/pkg/types"
)
type WatchingNamespaceProvider interface {
provider.Provider
Init(ctx context.Context) error
IsWatchingNamespace(key string) bool
WatchingNamespaces() []string
}
type watchingProvider struct {
kube *kube.KubeClient
cfg *config.Config
watchingNamespaces *sync.Map
watchingMultiValuedLabels types.MultiValueLabels
namespaceInformer cache.SharedIndexInformer
namespaceLister listerscorev1.NamespaceLister
controller *namespaceController
enableLabelsWatching bool
}
func NewWatchingNamespaceProvider(ctx context.Context, kube *kube.KubeClient, cfg *config.Config, syncCh chan string) (WatchingNamespaceProvider, error) {
c := &watchingProvider{
kube: kube,
cfg: cfg,
watchingNamespaces: new(sync.Map),
watchingMultiValuedLabels: make(map[string][]string),
enableLabelsWatching: false,
}
if len(cfg.Kubernetes.NamespaceSelector) == 0 {
return c, nil
}
// support namespace label-selector
c.enableLabelsWatching = true
for _, selector := range cfg.Kubernetes.NamespaceSelector {
labelSlice := strings.Split(selector, "=")
if len(labelSlice) != 2 {
return nil, fmt.Errorf("bad namespace-selector format: %s, expected namespace-selector format: xxx=xxx", selector)
}
c.watchingMultiValuedLabels[labelSlice[0]] = append(c.watchingMultiValuedLabels[labelSlice[0]], labelSlice[1])
}
kubeFactory := kube.NewSharedIndexInformerFactory()
c.namespaceInformer = kubeFactory.Core().V1().Namespaces().Informer()
c.namespaceLister = kubeFactory.Core().V1().Namespaces().Lister()
c.controller = newNamespaceController(c, syncCh)
return c, nil
}
func (c *watchingProvider) Init(ctx context.Context) error {
err := c.initWatchingNamespacesByLabels(ctx)
if err != nil {
return err
}
return nil
}
func (c *watchingProvider) initWatchingNamespacesByLabels(ctx context.Context) error {
for _, q := range c.watchingMultiValuedLabels.BuildQuery() {
opts := metav1.ListOptions{
LabelSelector: q,
}
namespaces, err := c.kube.Client.CoreV1().Namespaces().List(ctx, opts)
if err != nil {
return err
}
var nss []string
for _, ns := range namespaces.Items {
nss = append(nss, ns.Name)
c.watchingNamespaces.Store(ns.Name, struct{}{})
}
log.Infow("label selector watching namespaces", zap.Strings("namespaces", nss))
//---
namespaces, err = c.kube.Client.CoreV1().Namespaces().List(ctx, metav1.ListOptions{})
if err != nil {
return err
}
var tnss []string
for _, ns := range namespaces.Items {
tnss = append(tnss, ns.Name)
}
log.Infow("total namespaces", zap.Strings("namespaces", tnss))
}
return nil
}
func (c *watchingProvider) Run(ctx context.Context) {
if !c.enableLabelsWatching {
return
}
e := utils.ParallelExecutor{}
e.Add(func() {
c.namespaceInformer.Run(ctx.Done())
})
e.Add(func() {
c.controller.run(ctx)
})
e.Wait()
}
func (c *watchingProvider) WatchingNamespaces() []string {
var keys []string
if c.enableLabelsWatching {
c.watchingNamespaces.Range(func(key, _ interface{}) bool {
keys = append(keys, key.(string))
return true
})
} else {
namespaces, err := c.kube.Client.CoreV1().Namespaces().List(context.Background(), metav1.ListOptions{})
if err != nil {
log.Warnw("Namespace list get failed",
zap.Error(err),
)
return nil
}
for _, ns := range namespaces.Items {
keys = append(keys, ns.Name)
}
}
return keys
}
// IsWatchingNamespace accepts a resource key, getting the namespace part
// and checking whether the namespace is being watched.
func (c *watchingProvider) IsWatchingNamespace(key string) (ok bool) {
if !c.enableLabelsWatching {
ok = true
return
}
ns, _, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
// Ignore resource with invalid key.
ok = false
log.Warnf("resource %s was ignored since: %s", key, err)
return
}
_, ok = c.watchingNamespaces.Load(ns)
return
}