npm/pkg/controlplane/controllers/v2/networkPolicyController.go (252 lines of code) (raw):
// Copyright 2018 Microsoft. All rights reserved.
// MIT License
package controllers
import (
"errors"
"fmt"
"reflect"
"sync"
"time"
"github.com/Azure/azure-container-networking/npm/metrics"
"github.com/Azure/azure-container-networking/npm/pkg/controlplane/translation"
"github.com/Azure/azure-container-networking/npm/pkg/dataplane"
"github.com/Azure/azure-container-networking/npm/util"
networkingv1 "k8s.io/api/networking/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
networkinginformers "k8s.io/client-go/informers/networking/v1"
netpollister "k8s.io/client-go/listers/networking/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
)
var (
errNetPolKeyFormat = errors.New("invalid network policy key format")
errNetPolTranslationFailure = errors.New("failed to translate network policy")
)
type NetworkPolicyController struct {
sync.RWMutex
netPolLister netpollister.NetworkPolicyLister
workqueue workqueue.RateLimitingInterface
rawNpSpecMap map[string]*networkingv1.NetworkPolicySpec // Key is <nsname>/<policyname>
dp dataplane.GenericDataplane
npmLiteToggle bool
}
func (c *NetworkPolicyController) GetCache() map[string]*networkingv1.NetworkPolicySpec {
c.RLock()
defer c.RUnlock()
return c.rawNpSpecMap
}
func NewNetworkPolicyController(npInformer networkinginformers.NetworkPolicyInformer, dp dataplane.GenericDataplane, npmLiteToggle bool) *NetworkPolicyController {
netPolController := &NetworkPolicyController{
netPolLister: npInformer.Lister(),
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "NetworkPolicy"),
rawNpSpecMap: make(map[string]*networkingv1.NetworkPolicySpec),
dp: dp,
npmLiteToggle: npmLiteToggle,
}
npInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: netPolController.addNetworkPolicy,
UpdateFunc: netPolController.updateNetworkPolicy,
DeleteFunc: netPolController.deleteNetworkPolicy,
},
)
return netPolController
}
func (c *NetworkPolicyController) LengthOfRawNpMap() int {
return len(c.rawNpSpecMap)
}
// getNetworkPolicyKey returns namespace/name of network policy object if it is valid network policy object and has valid namespace/name.
// If not, it returns error.
func (c *NetworkPolicyController) getNetworkPolicyKey(obj interface{}) (string, error) {
var key string
_, ok := obj.(*networkingv1.NetworkPolicy)
if !ok {
return key, fmt.Errorf("cannot cast obj (%v) to network policy obj err: %w", obj, errNetPolKeyFormat)
}
var err error
if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
return key, fmt.Errorf("error due to %w", err)
}
return key, nil
}
func (c *NetworkPolicyController) addNetworkPolicy(obj interface{}) {
netPolkey, err := c.getNetworkPolicyKey(obj)
if err != nil {
utilruntime.HandleError(err)
return
}
c.workqueue.Add(netPolkey)
}
func (c *NetworkPolicyController) updateNetworkPolicy(old, newnetpol interface{}) {
netPolkey, err := c.getNetworkPolicyKey(newnetpol)
if err != nil {
utilruntime.HandleError(err)
return
}
// new network policy object is already checked validation by calling getNetworkPolicyKey function.
newNetPol, _ := newnetpol.(*networkingv1.NetworkPolicy)
oldNetPol, ok := old.(*networkingv1.NetworkPolicy)
if ok {
if oldNetPol.ResourceVersion == newNetPol.ResourceVersion {
// Periodic resync will send update events for all known network plicies.
// Two different versions of the same network policy will always have different RVs.
return
}
}
c.workqueue.Add(netPolkey)
}
func (c *NetworkPolicyController) deleteNetworkPolicy(obj interface{}) {
netPolObj, ok := obj.(*networkingv1.NetworkPolicy)
// DeleteFunc gets the final state of the resource (if it is known).
// Otherwise, it gets an object of type DeletedFinalStateUnknown.
// This can happen if the watch is closed and misses the delete event and
// the controller doesn't notice the deletion until the subsequent re-list
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
metrics.SendErrorLogAndMetric(util.NetpolID, "[NETPOL DELETE EVENT] Received unexpected object type: %v", obj)
return
}
if netPolObj, ok = tombstone.Obj.(*networkingv1.NetworkPolicy); !ok {
metrics.SendErrorLogAndMetric(util.NetpolID, "[NETPOL DELETE EVENT] Received unexpected object type (error decoding object tombstone, invalid type): %v", obj)
return
}
}
var netPolkey string
var err error
if netPolkey, err = cache.MetaNamespaceKeyFunc(netPolObj); err != nil {
utilruntime.HandleError(err)
return
}
c.workqueue.Add(netPolkey)
}
func (c *NetworkPolicyController) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer c.workqueue.ShutDown()
// TODO: Refactor non-error/warning klogs with Zap and set the following logs to "debug" level
// klog.Infof("Starting Network Policy worker")
go wait.Until(c.runWorker, time.Second, stopCh)
// TODO: Refactor non-error/warning klogs with Zap and set the following logs to "debug" level
// klog.Infof("Started Network Policy worker")
<-stopCh
// TODO: Refactor non-error/warning klogs with Zap and set the following logs to "debug" level
// klog.Info("Shutting down Network Policy workers")
}
func (c *NetworkPolicyController) runWorker() {
for c.processNextWorkItem() {
}
}
func (c *NetworkPolicyController) processNextWorkItem() bool {
obj, shutdown := c.workqueue.Get()
if shutdown {
return false
}
err := func(obj interface{}) error {
defer c.workqueue.Done(obj)
var key string
var ok bool
if key, ok = obj.(string); !ok {
// As the item in the workqueue is actually invalid, we call
// Forget here else we'd go into a loop of attempting to
// process a work item that is invalid.
c.workqueue.Forget(obj)
utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v, err %w", obj, errWorkqueueFormatting))
return nil
}
// Run the syncNetPol, passing it the namespace/name string of the
// network policy resource to be synced.
if err := c.syncNetPol(key); err != nil {
// Put the item back on the workqueue to handle any transient errors.
c.workqueue.AddRateLimited(key)
return fmt.Errorf("error syncing '%s': %w, requeuing", key, err)
}
// Finally, if no error occurs we Forget this item so it does not
// get queued again until another change happens.
c.workqueue.Forget(obj)
// TODO: Refactor non-error/warning klogs with Zap and set the following logs to "debug" level
// klog.Infof("Successfully synced '%s'", key)
return nil
}(obj)
if err != nil {
utilruntime.HandleError(err)
metrics.SendErrorLogAndMetric(util.NetpolID, "syncNetPol error due to %v", err)
return true
}
return true
}
// syncNetPol compares the actual state with the desired, and attempts to converge the two.
func (c *NetworkPolicyController) syncNetPol(key string) error {
// timer for recording execution times
timer := metrics.StartNewTimer()
// Convert the namespace/name string into a distinct namespace and name
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
utilruntime.HandleError(fmt.Errorf("invalid resource key: %s err: %w", key, errNetPolKeyFormat))
return nil //nolint HandleError is used instead of returning error to caller
}
// record exec time after syncing
operationKind := metrics.NoOp
defer func() {
metrics.RecordControllerPolicyExecTime(timer, operationKind, err != nil)
}()
// Get the network policy resource with this namespace/name
netPolObj, err := c.netPolLister.NetworkPolicies(namespace).Get(name)
if err != nil {
if k8serrors.IsNotFound(err) {
klog.Infof("Network Policy %s is not found, may be it is deleted", key)
if _, ok := c.rawNpSpecMap[key]; ok {
// record time to delete policy if it exists (can't call within cleanUpNetworkPolicy because this can be called by a pod update)
operationKind = metrics.DeleteOp
}
// netPolObj is not found, but should need to check the RawNpMap cache with key.
// cleanUpNetworkPolicy method will take care of the deletion of a cached network policy if the cached network policy exists with key in our RawNpMap cache.
err = c.cleanUpNetworkPolicy(key)
if err != nil {
return fmt.Errorf("[syncNetPol] error: %w when network policy is not found", err)
}
return err
}
return err
}
// If DeletionTimestamp of the netPolObj is set, start cleaning up lastly applied states.
// This is early cleaning up process from updateNetPol event
if netPolObj.ObjectMeta.DeletionTimestamp != nil || netPolObj.ObjectMeta.DeletionGracePeriodSeconds != nil {
if _, ok := c.rawNpSpecMap[key]; ok {
// record time to delete policy if it exists (can't call within cleanUpNetworkPolicy because this can be called by a pod update)
operationKind = metrics.DeleteOp
}
err = c.cleanUpNetworkPolicy(key)
if err != nil {
return fmt.Errorf("error: %w when ObjectMeta.DeletionTimestamp field is set", err)
}
return nil
}
cachedNetPolSpecObj, netPolExists := c.rawNpSpecMap[key]
if netPolExists {
// if network policy does not have different states against lastly applied states stored in cachedNetPolObj,
// netPolController does not need to reconcile this update.
// In this updateNetworkPolicy event,
// newNetPol was updated with states which netPolController does not need to reconcile.
if reflect.DeepEqual(cachedNetPolSpecObj, &netPolObj.Spec) {
return nil
}
}
operationKind, err = c.syncAddAndUpdateNetPol(netPolObj)
if err != nil {
return fmt.Errorf("[syncNetPol] error due to %w", err)
}
return nil
}
// syncAddAndUpdateNetPol handles a new network policy or an updated network policy object triggered by add and update events
func (c *NetworkPolicyController) syncAddAndUpdateNetPol(netPolObj *networkingv1.NetworkPolicy) (metrics.OperationKind, error) {
var err error
netpolKey, err := cache.MetaNamespaceKeyFunc(netPolObj)
if err != nil {
// consider a no-op since we can't determine add vs. update. The exec time here isn't important either.
return metrics.NoOp, fmt.Errorf("[syncAddAndUpdateNetPol] Error: while running MetaNamespaceKeyFunc err: %w", err)
}
// install translated rules into kernel
npmNetPolObj, err := translation.TranslatePolicy(netPolObj, c.npmLiteToggle)
if err != nil {
if isUnsupportedWindowsTranslationErr(err) {
klog.Warningf("NetworkPolicy %s in namespace %s is not translated because it has unsupported translated features of Windows: %s",
netPolObj.ObjectMeta.Name, netPolObj.ObjectMeta.Namespace, err.Error())
// We can safely suppress unsupported network policy because re-Queuing will result in same error.
// The exec time isn't relevant here, so consider a no-op.
return metrics.NoOp, nil
}
klog.Errorf("Failed to translate podSelector in NetworkPolicy %s in namespace %s: %s", netPolObj.ObjectMeta.Name, netPolObj.ObjectMeta.Namespace, err.Error())
// The exec time isn't relevant here, so consider a no-op. Returning nil to prevent re-queuing since this is not a transient error.
return metrics.NoOp, nil
}
_, policyExisted := c.rawNpSpecMap[netpolKey]
var operationKind metrics.OperationKind
if policyExisted {
operationKind = metrics.UpdateOp
} else {
operationKind = metrics.CreateOp
}
// install translated rules into Dataplane
// DP update policy call will check if this policy already exists in kernel
// if yes: then will delete old rules and program new rules
// if no: then will program add new rules
err = c.dp.UpdatePolicy(npmNetPolObj)
if err != nil {
// if error occurred the key is re-queued in workqueue and process this function again,
// which eventually meets desired states of network policy
return operationKind, fmt.Errorf("[syncAddAndUpdateNetPol] Error: failed to update translated NPMNetworkPolicy into Dataplane due to %w", err)
}
if !policyExisted {
// inc metric for NumPolicies only if it a new network policy
metrics.IncNumPolicies()
}
c.rawNpSpecMap[netpolKey] = &netPolObj.Spec
return operationKind, nil
}
// DeleteNetworkPolicy handles deleting network policy based on netPolKey.
func (c *NetworkPolicyController) cleanUpNetworkPolicy(netPolKey string) error {
_, cachedNetPolObjExists := c.rawNpSpecMap[netPolKey]
// if there is no applied network policy with the netPolKey, do not need to clean up process.
if !cachedNetPolObjExists {
return nil
}
err := c.dp.RemovePolicy(netPolKey)
if err != nil {
return fmt.Errorf("[cleanUpNetworkPolicy] Error: failed to remove policy due to %w", err)
}
// Success to clean up ipset and iptables operations in kernel and delete the cached network policy from RawNpMap
delete(c.rawNpSpecMap, netPolKey)
metrics.DecNumPolicies()
return nil
}
func isUnsupportedWindowsTranslationErr(err error) bool {
return errors.Is(err, translation.ErrUnsupportedNamedPort) ||
errors.Is(err, translation.ErrUnsupportedNegativeMatch) ||
errors.Is(err, translation.ErrUnsupportedSCTP) ||
errors.Is(err, translation.ErrUnsupportedExceptCIDR)
}