pkg/config/configmap_manager.go (120 lines of code) (raw):

package config import ( "context" "errors" "github.com/aws/amazon-network-policy-controller-k8s/pkg/k8s" "github.com/go-logr/logr" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" ) // +kubebuilder:rbac:groups="",resources=configmaps,namespace="system",resourceNames=amazon-vpc-cni,verbs=get;list;watch type ConfigmapManager interface { MonitorConfigMap(ctx context.Context) error IsControllerEnabled() bool } var _ ConfigmapManager = (*defaultConfigmapManager)(nil) type defaultConfigmapManager struct { initialState bool resourceRef types.NamespacedName store cache.Store rt *cache.Reflector clientSet *kubernetes.Clientset logger logr.Logger cancelFn context.CancelFunc monitorStopChan chan struct{} storeNotifyChan chan struct{} configMapCheckFunction func(*corev1.ConfigMap) bool } func NewConfigmapManager(resourceRef types.NamespacedName, clientSet *kubernetes.Clientset, cancelFn context.CancelFunc, configmapCheckFunction func(configMap *corev1.ConfigMap) bool, logger logr.Logger) *defaultConfigmapManager { storeNotifyChan := make(chan struct{}) cmStore := k8s.NewConfigMapStore(storeNotifyChan) return &defaultConfigmapManager{ clientSet: clientSet, resourceRef: resourceRef, store: cmStore, logger: logger, cancelFn: cancelFn, monitorStopChan: make(chan struct{}), storeNotifyChan: storeNotifyChan, configMapCheckFunction: configmapCheckFunction, } } // IsControllerEnabled returns the initial state of the policy controller. func (m *defaultConfigmapManager) IsControllerEnabled() bool { m.logger.V(1).Info("IsControllerEnabled() returning", "value", m.initialState) return m.initialState } // MonitorConfigMap starts cache reflector and watches for configmap updates. func (m *defaultConfigmapManager) MonitorConfigMap(ctx context.Context) error { fieldSelector := fields.Set{"metadata.name": m.resourceRef.Name}.AsSelector().String() listFunc := func(options metav1.ListOptions) (runtime.Object, error) { options.FieldSelector = fieldSelector return m.clientSet.CoreV1().ConfigMaps(m.resourceRef.Namespace).List(ctx, options) } watchFunc := func(options metav1.ListOptions) (watch.Interface, error) { options.FieldSelector = fieldSelector return m.clientSet.CoreV1().ConfigMaps(m.resourceRef.Namespace).Watch(ctx, options) } m.rt = cache.NewReflector(&cache.ListWatch{ListFunc: listFunc, WatchFunc: watchFunc}, &corev1.ConfigMap{}, m.store, 0, ) go m.rt.Run(m.monitorStopChan) go m.listenForConfigMapUpdates() if _, err := m.setInitialControllerState(); err != nil { m.logger.Info("Failed to set initial state", "err", err) return err } return nil } // listen for the messages in the storeNotifyChan in a loop and update the state of the policy controller accordingly. func (m *defaultConfigmapManager) listenForConfigMapUpdates() { defer func() { m.logger.Info("Controller detected changes to the configmap, cancelling manager context") close(m.monitorStopChan) m.cancelFn() }() for { select { case <-m.storeNotifyChan: enabled, err := m.getCurrentEnabledConfig() if err != nil { m.logger.Error(err, "Failed to get controller state from configmap") return } m.logger.V(1).Info("Received configmap notification", "initial", m.initialState, "new", enabled) if m.initialState != enabled { m.logger.Info("Controller state changed", "initial", m.initialState, "new", enabled) return } } } } // getCurrentEnabledConfig gets the current state of the policy controller from the configmap func (m *defaultConfigmapManager) getCurrentEnabledConfig() (bool, error) { cm, exists, err := m.store.GetByKey(m.resourceRef.String()) if err != nil { return false, err } if !exists { return false, nil } return m.configMapCheckFunction(cm.(*corev1.ConfigMap)), nil } // setInitialControllerState sets the initial state of the policy controller based on the configmap func (m *defaultConfigmapManager) setInitialControllerState() (retVal bool, err error) { defer func() { m.logger.V(1).Info("setInitialControllerState", "retVal", retVal, "err", err) m.initialState = retVal }() // Wait for cache sync if !cache.WaitForCacheSync(m.monitorStopChan, func() bool { return m.rt.LastSyncResourceVersion() != "" }) { return false, errors.New("failed to sync configmap cache") } return m.getCurrentEnabledConfig() }