pkg/providers/k8s/secret.go (203 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package k8s
import (
"context"
"time"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
listerscorev1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"github.com/apache/apisix-ingress-controller/pkg/log"
apisixprovider "github.com/apache/apisix-ingress-controller/pkg/providers/apisix"
ingressprovider "github.com/apache/apisix-ingress-controller/pkg/providers/ingress"
"github.com/apache/apisix-ingress-controller/pkg/providers/k8s/namespace"
providertypes "github.com/apache/apisix-ingress-controller/pkg/providers/types"
"github.com/apache/apisix-ingress-controller/pkg/types"
)
type secretController struct {
*providertypes.Common
workqueue workqueue.RateLimitingInterface
workers int
secretLister listerscorev1.SecretLister
secretInformer cache.SharedIndexInformer
namespaceProvider namespace.WatchingNamespaceProvider
apisixProvider apisixprovider.Provider
ingressProvider ingressprovider.Provider
}
func newSecretController(common *providertypes.Common, namespaceProvider namespace.WatchingNamespaceProvider,
apisixProvider apisixprovider.Provider, ingressProvider ingressprovider.Provider) *secretController {
c := &secretController{
Common: common,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemFastSlowRateLimiter(1*time.Second, 60*time.Second, 5), "Secrets"),
workers: 1,
secretLister: common.SecretLister,
secretInformer: common.SecretInformer,
namespaceProvider: namespaceProvider,
apisixProvider: apisixProvider,
ingressProvider: ingressProvider,
}
c.secretInformer.AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: c.onAdd,
UpdateFunc: c.onUpdate,
DeleteFunc: c.onDelete,
},
)
return c
}
func (c *secretController) run(ctx context.Context) {
log.Info("secret controller started")
defer log.Info("secret controller exited")
defer c.workqueue.ShutDown()
if ok := cache.WaitForCacheSync(ctx.Done(), c.secretInformer.HasSynced); !ok {
log.Error("informers sync failed")
return
}
for i := 0; i < c.workers; i++ {
go c.runWorker(ctx)
}
<-ctx.Done()
}
func (c *secretController) runWorker(ctx context.Context) {
for {
obj, quit := c.workqueue.Get()
if quit {
return
}
err := c.sync(ctx, obj.(*types.Event))
c.workqueue.Done(obj)
c.handleSyncErr(obj, err)
}
}
func (c *secretController) sync(ctx context.Context, ev *types.Event) error {
key := ev.Object.(string)
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
log.Errorf("invalid resource key: %s", key)
return err
}
sec, err := c.secretLister.Secrets(namespace).Get(name)
if err != nil {
if !k8serrors.IsNotFound(err) {
log.Errorw("failed to get Secret",
zap.String("key", key),
zap.Error(err),
)
return err
}
if ev.Type != types.EventDelete {
log.Warnw("Secret was deleted before it can be delivered",
zap.String("key", key),
)
return nil
}
}
if ev.Type == types.EventDelete {
if sec != nil {
// We still find the resource while we are processing the DELETE event,
// that means object with same namespace and name was created, discarding
// this stale DELETE event.
log.Warnw("discard the stale secret delete event since the resource still exists",
zap.String("key", key),
)
return nil
}
sec = ev.Tombstone.(*corev1.Secret)
}
log.Debugw("sync secret change",
zap.String("key", key),
)
secretKey := namespace + "/" + name
c.apisixProvider.SyncSecretChange(ctx, ev, sec, secretKey)
c.ingressProvider.SyncSecretChange(ctx, ev, sec, secretKey)
return nil
}
func (c *secretController) handleSyncErr(obj interface{}, err error) {
if err == nil {
c.workqueue.Forget(obj)
c.MetricsCollector.IncrSyncOperation("secret", "success")
return
}
event := obj.(*types.Event)
if k8serrors.IsNotFound(err) && event.Type != types.EventDelete {
log.Infow("sync secret but not found, ignore",
zap.String("event_type", event.Type.String()),
zap.String("secret", event.Object.(string)),
)
c.workqueue.Forget(event)
return
}
log.Warnw("sync secret failed, will retry",
zap.Any("object", obj),
zap.Error(err),
)
c.workqueue.AddRateLimited(obj)
c.MetricsCollector.IncrSyncOperation("secret", "failure")
}
func (c *secretController) onAdd(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err != nil {
log.Errorf("found secret object with bad namespace/name: %s, ignore it", err)
return
}
if !c.namespaceProvider.IsWatchingNamespace(key) {
return
}
log.Debugw("secret add event arrived",
zap.String("key", key),
)
c.workqueue.Add(&types.Event{
Type: types.EventAdd,
Object: key,
})
c.MetricsCollector.IncrEvents("secret", "add")
}
func (c *secretController) onUpdate(prev, curr interface{}) {
prevSec := prev.(*corev1.Secret)
currSec := curr.(*corev1.Secret)
if prevSec.GetResourceVersion() >= currSec.GetResourceVersion() {
return
}
key, err := cache.MetaNamespaceKeyFunc(currSec)
if err != nil {
log.Errorf("found secrets object with bad namespace/name: %s, ignore it", err)
return
}
if !c.namespaceProvider.IsWatchingNamespace(key) {
return
}
log.Debugw("secret update event arrived",
zap.Any("secret name", curr.(*corev1.Secret).ObjectMeta.Name),
)
c.workqueue.Add(&types.Event{
Type: types.EventUpdate,
Object: key,
})
c.MetricsCollector.IncrEvents("secret", "update")
}
func (c *secretController) onDelete(obj interface{}) {
sec, ok := obj.(*corev1.Secret)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
log.Errorf("found secrets: %+v in bad tombstone state", obj)
return
}
sec = tombstone.Obj.(*corev1.Secret)
}
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
log.Errorf("found secret resource with bad meta namespace key: %s", err)
return
}
// FIXME Refactor Controller.isWatchingNamespace to just use
// namespace after all controllers use the same way to fetch
// the object.
if !c.namespaceProvider.IsWatchingNamespace(key) {
return
}
log.Debugw("secret delete event arrived",
zap.Any("final state", sec),
)
c.workqueue.Add(&types.Event{
Type: types.EventDelete,
Object: key,
Tombstone: sec,
})
c.MetricsCollector.IncrEvents("secret", "delete")
}