pkg/policyendpoints/manager.go (386 lines of code) (raw):
package policyendpoints
import (
"context"
"crypto/sha256"
"encoding/hex"
"strconv"
"golang.org/x/exp/maps"
"github.com/go-logr/logr"
"github.com/pkg/errors"
"github.com/samber/lo"
networking "k8s.io/api/networking/v1"
"k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"sigs.k8s.io/controller-runtime/pkg/client"
policyinfo "github.com/aws/amazon-network-policy-controller-k8s/api/v1alpha1"
"github.com/aws/amazon-network-policy-controller-k8s/pkg/k8s"
"github.com/aws/amazon-network-policy-controller-k8s/pkg/resolvers"
)
type PolicyEndpointsManager interface {
Reconcile(ctx context.Context, policy *networking.NetworkPolicy) error
Cleanup(ctx context.Context, policy *networking.NetworkPolicy) error
}
// NewPolicyEndpointsManager constructs a new policyEndpointsManager
func NewPolicyEndpointsManager(k8sClient client.Client, endpointChunkSize int, logger logr.Logger) *policyEndpointsManager {
endpointsResolver := resolvers.NewEndpointsResolver(k8sClient, logger.WithName("endpoints-resolver"))
return &policyEndpointsManager{
k8sClient: k8sClient,
endpointsResolver: endpointsResolver,
endpointChunkSize: endpointChunkSize,
logger: logger,
}
}
var _ PolicyEndpointsManager = (*policyEndpointsManager)(nil)
type policyEndpointsManager struct {
k8sClient client.Client
endpointsResolver resolvers.EndpointsResolver
endpointChunkSize int
logger logr.Logger
}
func (m *policyEndpointsManager) Reconcile(ctx context.Context, policy *networking.NetworkPolicy) error {
ingressRules, egressRules, podSelectorEndpoints, err := m.endpointsResolver.Resolve(ctx, policy)
if err != nil {
return err
}
policyEndpointList := &policyinfo.PolicyEndpointList{}
if err := m.k8sClient.List(ctx, policyEndpointList,
client.InNamespace(policy.Namespace),
client.MatchingFields{IndexKeyPolicyReferenceName: policy.Name}); err != nil {
return err
}
existingPolicyEndpoints := make([]policyinfo.PolicyEndpoint, 0, len(policyEndpointList.Items))
for _, policyEndpoint := range policyEndpointList.Items {
existingPolicyEndpoints = append(existingPolicyEndpoints, policyEndpoint)
}
createList, updateList, deleteList, err := m.computePolicyEndpoints(policy, existingPolicyEndpoints, ingressRules, egressRules, podSelectorEndpoints)
if err != nil {
return err
}
m.logger.Info("Got policy endpoints lists", "create", len(createList), "update", len(updateList), "delete", len(deleteList))
for _, policyEndpoint := range createList {
if err := m.k8sClient.Create(ctx, &policyEndpoint); err != nil {
return err
}
m.logger.Info("Created policy endpoint", "id", k8s.NamespacedName(&policyEndpoint))
}
for _, policyEndpoint := range updateList {
oldRes := &policyinfo.PolicyEndpoint{}
if err := m.k8sClient.Get(ctx, k8s.NamespacedName(&policyEndpoint), oldRes); err != nil {
return err
}
if equality.Semantic.DeepEqual(oldRes.Spec, policyEndpoint.Spec) {
m.logger.V(1).Info("Policy endpoint already up to date", "id", k8s.NamespacedName(&policyEndpoint))
continue
}
if err := m.k8sClient.Patch(ctx, &policyEndpoint, client.MergeFrom(oldRes)); err != nil {
return err
}
m.logger.Info("Updated policy endpoint", "id", k8s.NamespacedName(&policyEndpoint))
}
for _, policyEndpoint := range deleteList {
if err := m.k8sClient.Delete(ctx, &policyEndpoint); err != nil {
return err
}
m.logger.Info("Deleted policy endpoint", "id", k8s.NamespacedName(&policyEndpoint))
}
return nil
}
func (m *policyEndpointsManager) Cleanup(ctx context.Context, policy *networking.NetworkPolicy) error {
policyEndpointList := &policyinfo.PolicyEndpointList{}
if err := m.k8sClient.List(ctx, policyEndpointList,
client.InNamespace(policy.Namespace),
client.MatchingLabels{IndexKeyPolicyReferenceName: policy.Name}); err != nil {
return errors.Wrap(err, "unable to list policyendpoints")
}
for _, policyEndpoint := range policyEndpointList.Items {
if err := m.k8sClient.Delete(ctx, &policyEndpoint); err != nil {
return errors.Wrap(err, "unable to delete policyendpoint")
}
m.logger.Info("Deleted policy endpoint", "id", k8s.NamespacedName(&policyEndpoint))
}
return nil
}
// computePolicyEndpoints computes the policy endpoints for the given policy
// The return values are list of policy endpoints to create, update and delete
func (m *policyEndpointsManager) computePolicyEndpoints(policy *networking.NetworkPolicy,
existingPolicyEndpoints []policyinfo.PolicyEndpoint, ingressEndpoints []policyinfo.EndpointInfo,
egressEndpoints []policyinfo.EndpointInfo, podSelectorEndpoints []policyinfo.PodEndpoint) ([]policyinfo.PolicyEndpoint,
[]policyinfo.PolicyEndpoint, []policyinfo.PolicyEndpoint, error) {
// Loop through ingressEndpoints, egressEndpoints and podSelectorEndpoints and put in map
// also populate them into policy endpoints
ingressEndpointsMap, egressEndpointsMap, podSelectorEndpointSet, modifiedEndpoints, potentialDeletes := m.processExistingPolicyEndpoints(
policy, existingPolicyEndpoints, ingressEndpoints, egressEndpoints, podSelectorEndpoints,
)
doNotDelete := sets.Set[types.NamespacedName]{}
var createPolicyEndpoints []policyinfo.PolicyEndpoint
var updatePolicyEndpoints []policyinfo.PolicyEndpoint
var deletePolicyEndpoints []policyinfo.PolicyEndpoint
// packing new ingress rules
createPolicyEndpoints, doNotDeleteIngress := m.packingIngressRules(policy, ingressEndpointsMap, createPolicyEndpoints, modifiedEndpoints, potentialDeletes)
// packing new egress rules
createPolicyEndpoints, doNotDeleteEgress := m.packingEgressRules(policy, egressEndpointsMap, createPolicyEndpoints, modifiedEndpoints, potentialDeletes)
// packing new pod selector
createPolicyEndpoints, doNotDeletePs := m.packingPodSelectorEndpoints(policy, podSelectorEndpointSet.UnsortedList(), createPolicyEndpoints, modifiedEndpoints, potentialDeletes)
doNotDelete.Insert(doNotDeleteIngress.UnsortedList()...)
doNotDelete.Insert(doNotDeleteEgress.UnsortedList()...)
doNotDelete.Insert(doNotDeletePs.UnsortedList()...)
for _, ep := range potentialDeletes {
if doNotDelete.Has(k8s.NamespacedName(&ep)) {
updatePolicyEndpoints = append(updatePolicyEndpoints, ep)
} else {
deletePolicyEndpoints = append(deletePolicyEndpoints, ep)
}
}
updatePolicyEndpoints = append(updatePolicyEndpoints, modifiedEndpoints...)
if len(createPolicyEndpoints) == 0 && len(updatePolicyEndpoints) == 0 {
if len(deletePolicyEndpoints) == 0 {
newEP := m.newPolicyEndpoint(policy, nil, nil, nil)
createPolicyEndpoints = append(createPolicyEndpoints, newEP)
} else {
ep := deletePolicyEndpoints[0]
updatePolicyEndpoints = append(updatePolicyEndpoints, ep)
deletePolicyEndpoints = deletePolicyEndpoints[1:]
}
}
return m.processPolicyEndpoints(createPolicyEndpoints), m.processPolicyEndpoints(updatePolicyEndpoints), deletePolicyEndpoints, nil
}
func (m *policyEndpointsManager) processPolicyEndpoints(pes []policyinfo.PolicyEndpoint) []policyinfo.PolicyEndpoint {
var newPEs []policyinfo.PolicyEndpoint
for _, pe := range pes {
pe.Spec.Ingress = combineRulesEndpoints(pe.Spec.Ingress)
pe.Spec.Egress = combineRulesEndpoints(pe.Spec.Egress)
newPEs = append(newPEs, pe)
}
return newPEs
}
// the controller should consolidate the ingress and egress endpoints and put entries to one CIDR if they belong to a same CIDR
func combineRulesEndpoints(ingressEndpoints []policyinfo.EndpointInfo) []policyinfo.EndpointInfo {
combinedMap := make(map[string]policyinfo.EndpointInfo)
for _, iep := range ingressEndpoints {
if _, ok := combinedMap[string(iep.CIDR)]; ok {
tempIEP := combinedMap[string(iep.CIDR)]
tempIEP.Ports = append(combinedMap[string(iep.CIDR)].Ports, iep.Ports...)
tempIEP.Except = append(combinedMap[string(iep.CIDR)].Except, iep.Except...)
combinedMap[string(iep.CIDR)] = tempIEP
} else {
combinedMap[string(iep.CIDR)] = iep
}
}
if len(combinedMap) > 0 {
return maps.Values(combinedMap)
}
return nil
}
func (m *policyEndpointsManager) newPolicyEndpoint(policy *networking.NetworkPolicy,
ingressRules []policyinfo.EndpointInfo, egressRules []policyinfo.EndpointInfo,
podSelectorEndpoints []policyinfo.PodEndpoint) policyinfo.PolicyEndpoint {
blockOwnerDeletion := true
isController := true
policyEndpoint := policyinfo.PolicyEndpoint{
ObjectMeta: metav1.ObjectMeta{
Namespace: policy.Namespace,
GenerateName: policy.Name + "-",
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: "networking.k8s.io/v1",
Kind: "NetworkPolicy",
Name: policy.Name,
UID: policy.UID,
BlockOwnerDeletion: &blockOwnerDeletion,
Controller: &isController,
},
},
},
Spec: policyinfo.PolicyEndpointSpec{
PodSelector: &policy.Spec.PodSelector,
PodSelectorEndpoints: podSelectorEndpoints,
PolicyRef: policyinfo.PolicyReference{
Namespace: policy.Namespace,
Name: policy.Name,
},
PodIsolation: policy.Spec.PolicyTypes,
Ingress: ingressRules,
Egress: egressRules,
},
}
return policyEndpoint
}
func (m *policyEndpointsManager) getListOfEndpointInfoFromHash(hashes []string, epInfo map[string]policyinfo.EndpointInfo) []policyinfo.EndpointInfo {
var ruleList []policyinfo.EndpointInfo
for _, key := range hashes {
ruleList = append(ruleList, epInfo[key])
}
return ruleList
}
func (m *policyEndpointsManager) getEndpointInfoKey(info policyinfo.EndpointInfo) string {
hasher := sha256.New()
hasher.Write([]byte(info.CIDR))
for _, except := range info.Except {
hasher.Write([]byte(except))
}
for _, port := range info.Ports {
if port.Protocol != nil {
hasher.Write([]byte(*port.Protocol))
}
if port.Port != nil {
hasher.Write([]byte(strconv.Itoa(int(*port.Port))))
}
if port.EndPort != nil {
hasher.Write([]byte(strconv.Itoa(int(*port.EndPort))))
}
}
return hex.EncodeToString(hasher.Sum(nil))
}
// processExistingPolicyEndpoints processes the existing policies with the incoming network policy changes
// it returns required rules and pod selector changes, and potential modifications and deletions on policy endpoints.
func (m *policyEndpointsManager) processExistingPolicyEndpoints(
policy *networking.NetworkPolicy,
existingPolicyEndpoints []policyinfo.PolicyEndpoint, ingressEndpoints []policyinfo.EndpointInfo,
egressEndpoints []policyinfo.EndpointInfo, podSelectorEndpoints []policyinfo.PodEndpoint,
) (
map[string]policyinfo.EndpointInfo,
map[string]policyinfo.EndpointInfo,
sets.Set[policyinfo.PodEndpoint],
[]policyinfo.PolicyEndpoint,
[]policyinfo.PolicyEndpoint,
) {
// Loop through ingressEndpoints, egressEndpoints and podSelectorEndpoints and put in map
ingressEndpointsMap := map[string]policyinfo.EndpointInfo{}
for _, ingressEndpoint := range ingressEndpoints {
ingressEndpointsMap[m.getEndpointInfoKey(ingressEndpoint)] = ingressEndpoint
}
egressEndpointsMap := map[string]policyinfo.EndpointInfo{}
for _, egressEndpoint := range egressEndpoints {
egressEndpointsMap[m.getEndpointInfoKey(egressEndpoint)] = egressEndpoint
}
podSelectorEndpointSet := sets.New[policyinfo.PodEndpoint](podSelectorEndpoints...)
// Go over the existing endpoints, and remove entries that are no longer needed
var modifiedEndpoints []policyinfo.PolicyEndpoint
var potentialDeletes []policyinfo.PolicyEndpoint
// We loop through existing PolicyEndpoint resources for the current Network Policy and purge any stale endpoints across Ingress,
// Egress and PodSelector endpoints. Once a PolicyEndpoint resource is updated/processed we place it in modifiedEndpoints list
// and if a particular PolicyEndpoint resource is purged of all the endpoints, we mark it as a potential delete candidate.
// We then start bin-packing any new Ingress, Egress, PodSelector endpoints across the existing PolicyEndpoint resources placed
// in modified and potential delete candidate lists. We only create new PolicyEndpoint resources if we exhaust all the existing resources.
// Any PolicyEndpoint resources placed in potentialDelete bucket that aren't utilized at the end of the binpacking flow will be permanently deleted.
for i := range existingPolicyEndpoints {
ingEndpointList := make([]policyinfo.EndpointInfo, 0, len(existingPolicyEndpoints[i].Spec.Ingress))
for _, ingRule := range existingPolicyEndpoints[i].Spec.Ingress {
ruleKey := m.getEndpointInfoKey(ingRule)
if _, exists := ingressEndpointsMap[ruleKey]; exists {
ingEndpointList = append(ingEndpointList, ingRule)
delete(ingressEndpointsMap, ruleKey)
}
}
egEndpointList := make([]policyinfo.EndpointInfo, 0, len(existingPolicyEndpoints[i].Spec.Egress))
for _, egRule := range existingPolicyEndpoints[i].Spec.Egress {
ruleKey := m.getEndpointInfoKey(egRule)
if _, exists := egressEndpointsMap[ruleKey]; exists {
egEndpointList = append(egEndpointList, egRule)
delete(egressEndpointsMap, ruleKey)
}
}
podSelectorEndpointList := make([]policyinfo.PodEndpoint, 0, len(existingPolicyEndpoints[i].Spec.PodSelectorEndpoints))
for _, ps := range existingPolicyEndpoints[i].Spec.PodSelectorEndpoints {
if podSelectorEndpointSet.Has(ps) {
podSelectorEndpointList = append(podSelectorEndpointList, ps)
podSelectorEndpointSet.Delete(ps)
}
}
policyEndpointChanged := false
if !equality.Semantic.DeepEqual(policy.Spec.PolicyTypes, existingPolicyEndpoints[i].Spec.PodIsolation) {
existingPolicyEndpoints[i].Spec.PodIsolation = policy.Spec.PolicyTypes
policyEndpointChanged = true
}
if len(ingEndpointList) == 0 && len(egEndpointList) == 0 && len(podSelectorEndpointList) == 0 {
existingPolicyEndpoints[i].Spec.Ingress = ingEndpointList
existingPolicyEndpoints[i].Spec.Egress = egEndpointList
existingPolicyEndpoints[i].Spec.PodSelectorEndpoints = podSelectorEndpointList
potentialDeletes = append(potentialDeletes, existingPolicyEndpoints[i])
} else if len(existingPolicyEndpoints[i].Spec.Ingress) != len(ingEndpointList) || len(existingPolicyEndpoints[i].Spec.Egress) != len(egEndpointList) ||
len(existingPolicyEndpoints[i].Spec.PodSelectorEndpoints) != len(podSelectorEndpointList) || policyEndpointChanged {
existingPolicyEndpoints[i].Spec.Ingress = ingEndpointList
existingPolicyEndpoints[i].Spec.Egress = egEndpointList
existingPolicyEndpoints[i].Spec.PodSelectorEndpoints = podSelectorEndpointList
modifiedEndpoints = append(modifiedEndpoints, existingPolicyEndpoints[i])
} else {
modifiedEndpoints = append(modifiedEndpoints, existingPolicyEndpoints[i])
}
}
return ingressEndpointsMap, egressEndpointsMap, podSelectorEndpointSet, modifiedEndpoints, potentialDeletes
}
// packingIngressRules iterates over ingress rules across available policy endpoints and required ingress rule changes.
// it returns the ingress rules packed in policy endpoints and a set of policy endpoints that need to be kept.
func (m *policyEndpointsManager) packingIngressRules(policy *networking.NetworkPolicy,
rulesMap map[string]policyinfo.EndpointInfo,
createPolicyEndpoints, modifiedEndpoints, potentialDeletes []policyinfo.PolicyEndpoint) ([]policyinfo.PolicyEndpoint, sets.Set[types.NamespacedName]) {
doNotDelete := sets.Set[types.NamespacedName]{}
chunkStartIdx := 0
chunkEndIdx := 0
ingressList := maps.Keys(rulesMap)
// try to fill existing polciy endpoints first and then new ones if needed
for _, sliceToCheck := range [][]policyinfo.PolicyEndpoint{modifiedEndpoints, potentialDeletes, createPolicyEndpoints} {
for i := range sliceToCheck {
// reset start pointer if end pointer is updated
chunkStartIdx = chunkEndIdx
// Instead of adding the entire chunk we should try to add to full the slice
if len(sliceToCheck[i].Spec.Ingress) < m.endpointChunkSize && chunkEndIdx < len(ingressList) {
for len(sliceToCheck[i].Spec.Ingress)+(chunkEndIdx-chunkStartIdx+1) < m.endpointChunkSize && chunkEndIdx < len(ingressList)-1 {
chunkEndIdx++
}
sliceToCheck[i].Spec.Ingress = append(sliceToCheck[i].Spec.Ingress, m.getListOfEndpointInfoFromHash(lo.Slice(ingressList, chunkStartIdx, chunkEndIdx+1), rulesMap)...)
// move the end to next available index to prepare next appending
chunkEndIdx++
}
// as long as the second pointer moves, we need to include the PE
if chunkStartIdx != chunkEndIdx {
doNotDelete.Insert(k8s.NamespacedName(&sliceToCheck[i]))
}
}
}
// if the incoming ingress rules haven't been all processed yet, we need new PE(s).
if chunkEndIdx < len(ingressList) {
ingressRuleChunks := lo.Chunk(ingressList[chunkEndIdx:], m.endpointChunkSize)
for _, chunk := range ingressRuleChunks {
newEP := m.newPolicyEndpoint(policy, m.getListOfEndpointInfoFromHash(chunk, rulesMap), nil, nil)
createPolicyEndpoints = append(createPolicyEndpoints, newEP)
}
}
return createPolicyEndpoints, doNotDelete
}
// packingEgressRules iterates over egress rules across available policy endpoints and required egress rule changes.
// it returns the egress rules packed in policy endpoints and a set of policy endpoints that need to be kept.
func (m *policyEndpointsManager) packingEgressRules(policy *networking.NetworkPolicy,
rulesMap map[string]policyinfo.EndpointInfo,
createPolicyEndpoints, modifiedEndpoints, potentialDeletes []policyinfo.PolicyEndpoint) ([]policyinfo.PolicyEndpoint, sets.Set[types.NamespacedName]) {
doNotDelete := sets.Set[types.NamespacedName]{}
chunkStartIdx := 0
chunkEndIdx := 0
egressList := maps.Keys(rulesMap)
// try to fill existing polciy endpoints first and then new ones if needed
for _, sliceToCheck := range [][]policyinfo.PolicyEndpoint{modifiedEndpoints, potentialDeletes, createPolicyEndpoints} {
for i := range sliceToCheck {
// reset start pointer if end pointer is updated
chunkStartIdx = chunkEndIdx
// Instead of adding the entire chunk we should try to add to full the slice
if len(sliceToCheck[i].Spec.Egress) < m.endpointChunkSize && chunkEndIdx < len(egressList) {
for len(sliceToCheck[i].Spec.Egress)+(chunkEndIdx-chunkStartIdx+1) < m.endpointChunkSize && chunkEndIdx < len(egressList)-1 {
chunkEndIdx++
}
sliceToCheck[i].Spec.Egress = append(sliceToCheck[i].Spec.Egress, m.getListOfEndpointInfoFromHash(lo.Slice(egressList, chunkStartIdx, chunkEndIdx+1), rulesMap)...)
// move the end to next available index to prepare next appending
chunkEndIdx++
}
// as long as the second pointer moves, we need to include the PE
if chunkStartIdx != chunkEndIdx {
doNotDelete.Insert(k8s.NamespacedName(&sliceToCheck[i]))
}
}
}
// if the incoming egress rules haven't been all processed yet, we need new PE(s).
if chunkEndIdx < len(egressList) {
egressRuleChunks := lo.Chunk(egressList[chunkEndIdx:], m.endpointChunkSize)
for _, chunk := range egressRuleChunks {
newEP := m.newPolicyEndpoint(policy, nil, m.getListOfEndpointInfoFromHash(chunk, rulesMap), nil)
createPolicyEndpoints = append(createPolicyEndpoints, newEP)
}
}
return createPolicyEndpoints, doNotDelete
}
// packingPodSelectorEndpoints iterates over pod selectors across available policy endpoints and required pod selector changes.
// it returns the pod selectors packed in policy endpoints and a set of policy endpoints that need to be kept.
func (m *policyEndpointsManager) packingPodSelectorEndpoints(policy *networking.NetworkPolicy,
psList []policyinfo.PodEndpoint,
createPolicyEndpoints, modifiedEndpoints, potentialDeletes []policyinfo.PolicyEndpoint) ([]policyinfo.PolicyEndpoint, sets.Set[types.NamespacedName]) {
doNotDelete := sets.Set[types.NamespacedName]{}
chunkStartIdx := 0
chunkEndIdx := 0
// try to fill existing polciy endpoints first and then new ones if needed
for _, sliceToCheck := range [][]policyinfo.PolicyEndpoint{modifiedEndpoints, potentialDeletes, createPolicyEndpoints} {
for i := range sliceToCheck {
// reset start pointer if end pointer is updated
chunkStartIdx = chunkEndIdx
// Instead of adding the entire chunk we should try to add to full the slice
if len(sliceToCheck[i].Spec.PodSelectorEndpoints) < m.endpointChunkSize && chunkEndIdx < len(psList) {
for len(sliceToCheck[i].Spec.PodSelectorEndpoints)+(chunkEndIdx-chunkStartIdx+1) < m.endpointChunkSize && chunkEndIdx < len(psList)-1 {
chunkEndIdx++
}
sliceToCheck[i].Spec.PodSelectorEndpoints = append(sliceToCheck[i].Spec.PodSelectorEndpoints, lo.Slice(psList, chunkStartIdx, chunkEndIdx+1)...)
// move the end to next available index to prepare next appending
chunkEndIdx++
}
// as long as the second pointer moves, we need to include the PE
if chunkStartIdx != chunkEndIdx {
doNotDelete.Insert(k8s.NamespacedName(&sliceToCheck[i]))
}
}
}
// if the incoming podselectors haven't been all processed yet, we need new PE(s).
if chunkEndIdx < len(psList) {
psChunks := lo.Chunk(psList[chunkEndIdx:], m.endpointChunkSize)
for _, chunk := range psChunks {
newEP := m.newPolicyEndpoint(policy, nil, nil, chunk)
createPolicyEndpoints = append(createPolicyEndpoints, newEP)
}
}
return createPolicyEndpoints, doNotDelete
}