pkg/k8s/policyhelper/policy.go (317 lines of code) (raw):
package policyhelper
import (
"context"
"errors"
"fmt"
"strings"
"golang.org/x/exp/slices"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/builder"
k8sclient "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
gwv1 "sigs.k8s.io/gateway-api/apis/v1"
gwv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2"
anv1alpha1 "github.com/aws/aws-application-networking-k8s/pkg/apis/applicationnetworking/v1alpha1"
"github.com/aws/aws-application-networking-k8s/pkg/utils"
"github.com/aws/aws-application-networking-k8s/pkg/utils/gwlog"
)
var (
ErrGroupKind = errors.New("group/kind error")
ErrTargetRefNotFound = errors.New("targetRef not found")
ErrTargetRefConflict = errors.New("targetRef has conflict")
)
type (
TargetRef = gwv1alpha2.NamespacedPolicyTargetReference
ConditionType = gwv1alpha2.PolicyConditionType
ConditionReason = gwv1alpha2.PolicyConditionReason
)
const (
// GEP
ConditionTypeAccepted = gwv1alpha2.PolicyConditionAccepted
ReasonAccepted = gwv1alpha2.PolicyReasonAccepted
ReasonInvalid = gwv1alpha2.PolicyReasonInvalid
ReasonTargetNotFound = gwv1alpha2.PolicyReasonTargetNotFound
ReasonConflicted = gwv1alpha2.PolicyReasonConflicted
// Non-GEP
ReasonUnknown = ConditionReason("Unknown")
)
type (
TGP = anv1alpha1.TargetGroupPolicy
TGPL = anv1alpha1.TargetGroupPolicyList
IAP = anv1alpha1.IAMAuthPolicy
IAPL = anv1alpha1.IAMAuthPolicyList
VAP = anv1alpha1.VpcAssociationPolicy
VAPL = anv1alpha1.VpcAssociationPolicyList
)
func NewVpcAssociationPolicyHandler(log gwlog.Logger, c k8sclient.Client) *PolicyHandler[*VAP] {
phcfg := PolicyHandlerConfig{
Log: log,
Client: c,
TargetRefKinds: NewGroupKindSet(&gwv1.Gateway{}),
}
return NewPolicyHandler[VAP, VAPL](phcfg)
}
func NewTargetGroupPolicyHandler(log gwlog.Logger, c k8sclient.Client) *PolicyHandler[*TGP] {
phcfg := PolicyHandlerConfig{
Log: log,
Client: c,
TargetRefKinds: NewGroupKindSet(&corev1.Service{}, &anv1alpha1.ServiceExport{}),
}
return NewPolicyHandler[TGP, TGPL](phcfg)
}
func NewIAMAuthPolicyHandler(log gwlog.Logger, c k8sclient.Client) *PolicyHandler[*IAP] {
phcfg := PolicyHandlerConfig{
Log: log,
Client: c,
TargetRefKinds: NewGroupKindSet(&gwv1.Gateway{}, &gwv1.HTTPRoute{}, &gwv1.GRPCRoute{}),
}
return NewPolicyHandler[IAP, IAPL](phcfg)
}
// Policy with PolicyTargetReference
type Policy interface {
k8sclient.Object
GetTargetRef() *TargetRef
GetStatusConditions() *[]metav1.Condition
}
type PolicyList[P Policy] interface {
k8sclient.ObjectList
GetItems() []P
}
type GroupKindSet = utils.Set[GroupKind]
func NewGroupKindSet(objs ...k8sclient.Object) *GroupKindSet {
gks := utils.SliceMap(objs, func(o k8sclient.Object) GroupKind {
return ObjToGroupKind(o)
})
set := utils.NewSet(gks...)
return &set
}
// A generic handler for common operations on particular policy type
type PolicyHandler[P Policy] struct {
log gwlog.Logger
kinds *GroupKindSet
client PolicyClient[P]
}
type PolicyHandlerConfig struct {
Log gwlog.Logger
Client k8sclient.Client
TargetRefKinds *GroupKindSet
}
// Creates policy handler for specific policy. T and TL are type and list-type for Policy (struct type, not reference).
// P and PL are reference types and should derive from T and TL. P and PL do not require explicit declaration. For example:
//
// ph := NewPolicyHandler[IAMAuthPolicy, IAMAuthPolicyList](cfg)
func NewPolicyHandler[T, TL any, P policyPtr[T], PL policyListPtr[TL, P]](cfg PolicyHandlerConfig) *PolicyHandler[P] {
ph := &PolicyHandler[P]{
log: cfg.Log,
client: newK8sPolicyClient[T, TL, P, PL](cfg.Client),
kinds: cfg.TargetRefKinds,
}
return ph
}
// Strong-typed interface to work with k8s client
type PolicyClient[P Policy] interface {
List(ctx context.Context, namespace string) ([]P, error)
Get(ctx context.Context, nsname types.NamespacedName) (P, error)
TargetRefObj(ctx context.Context, policy P) (k8sclient.Object, error)
UpdateStatus(ctx context.Context, policy P) error
}
type policyPtr[T any] interface {
Policy
*T
}
type policyListPtr[T any, P Policy] interface {
PolicyList[P]
*T
}
// k8s client based implementation of PolicyClient
type k8sPolicyClient[T, U any, P policyPtr[T], PL policyListPtr[U, P]] struct {
client k8sclient.Client
}
func newK8sPolicyClient[T, U any, P policyPtr[T], PL policyListPtr[U, P]](c k8sclient.Client) *k8sPolicyClient[T, U, P, PL] {
return &k8sPolicyClient[T, U, P, PL]{client: c}
}
func (pc *k8sPolicyClient[T, U, P, PL]) newList() PL {
var u U
return &u
}
func (pc *k8sPolicyClient[T, U, P, PL]) newPolicy() P {
var t T
return &t
}
func (pc *k8sPolicyClient[T, U, P, PL]) List(ctx context.Context, namespace string) ([]P, error) {
l := pc.newList()
err := pc.client.List(ctx, l, &k8sclient.ListOptions{Namespace: namespace})
if err != nil {
return nil, err
}
return l.GetItems(), nil
}
func (pc *k8sPolicyClient[T, U, P, PL]) Get(ctx context.Context, nsname types.NamespacedName) (P, error) {
p := pc.newPolicy()
err := pc.client.Get(ctx, nsname, p)
return p, err
}
func (pc *k8sPolicyClient[T, U, P, PL]) TargetRefObj(ctx context.Context, p P) (k8sclient.Object, error) {
tr := p.GetTargetRef()
obj, ok := GroupKindToObj(TargetRefGroupKind(tr))
if !ok {
return nil, fmt.Errorf("not supported GroupKind of targetRef, group/kind=%s/%s",
tr.Group, tr.Kind)
}
key := types.NamespacedName{
Namespace: p.GetNamespace(),
Name: string(tr.Name),
}
err := pc.client.Get(ctx, key, obj)
if err != nil {
return nil, err
}
return obj, nil
}
func (pc *k8sPolicyClient[T, U, P, PL]) UpdateStatus(ctx context.Context, policy P) error {
return pc.client.Status().Update(ctx, policy)
}
// Get all policies for given object, filtered by targetRef match and sorted by conflict resolution
// rules. First policy in the list is not-conflicting policy, but it might be in Accepted or Invalid
// state. Conflict resolution order uses CreationTimestamp and Name.
func (h *PolicyHandler[P]) ObjPolicies(ctx context.Context, obj k8sclient.Object) ([]P, error) {
allPolicies, err := h.client.List(ctx, obj.GetNamespace())
if err != nil {
return nil, err
}
out := []P{}
for _, policy := range allPolicies {
tr := policy.GetTargetRef()
if h.targetRefMatch(obj, tr) {
out = append(out, policy)
}
}
h.conflictResolutionSort(out)
return out, nil
}
// Get Accepted policy for given object. Returns policy with conflict resolution and status
// Accepted. Will return at most single policy.
func (h *PolicyHandler[P]) ObjResolvedPolicy(ctx context.Context, obj k8sclient.Object) (P, error) {
var empty P
objPolicies, err := h.ObjPolicies(ctx, obj)
if err != nil {
return empty, err
}
if len(objPolicies) == 0 {
return empty, nil
}
policy := objPolicies[0]
cnd := meta.FindStatusCondition(*policy.GetStatusConditions(), string(ConditionTypeAccepted))
if cnd != nil && cnd.Reason != string(ReasonAccepted) {
return empty, nil
}
return objPolicies[0], nil
}
// Add Watchers for configured Kinds to controller builder
func (h *PolicyHandler[P]) AddWatchers(b *builder.Builder, objs ...k8sclient.Object) {
h.log.Debugf(context.TODO(), "add watchers for types: %v", NewGroupKindSet(objs...).Items())
for _, watchObj := range objs {
b.Watches(watchObj, handler.EnqueueRequestsFromMapFunc(h.watchMapFn))
}
}
func (h *PolicyHandler[P]) watchMapFn(ctx context.Context, obj k8sclient.Object) []reconcile.Request {
out := []reconcile.Request{}
policies, err := h.client.List(ctx, obj.GetNamespace())
if err != nil {
h.log.Errorf(ctx, "watch mapfn error: for obj=%s/%s: %s",
obj.GetName(), obj.GetNamespace(), err)
return nil
}
for _, policy := range policies {
if h.targetRefMatch(obj, policy.GetTargetRef()) {
out = append(out, reconcile.Request{
NamespacedName: types.NamespacedName{
Name: policy.GetName(),
Namespace: policy.GetNamespace(),
},
})
}
}
return out
}
// Checks if objects matches targetReference, returns true if they match
// targetRef might not have namespace set, it should be inferred from policy itself.
// In this case we assume namespace already checked
func (h *PolicyHandler[P]) targetRefMatch(obj k8sclient.Object, tr *gwv1alpha2.NamespacedPolicyTargetReference) bool {
objGk := ObjToGroupKind(obj)
trGk := TargetRefGroupKind(tr)
return objGk == trGk && obj.GetName() == string(tr.Name)
}
// Validate Policy and update Accepted status condition.
func (h *PolicyHandler[P]) ValidateAndUpdateCondition(ctx context.Context, policy P) (ConditionReason, error) {
validationErr := h.ValidateTargetRef(ctx, policy)
reason := errToReason(validationErr)
msg := ""
if validationErr != nil {
msg = validationErr.Error()
}
err := h.UpdateAcceptedCondition(ctx, policy, reason, msg)
if err != nil {
return ReasonUnknown, err
}
return reason, nil
}
func (h *PolicyHandler[P]) ValidateTargetRef(ctx context.Context, policy P) error {
tr := policy.GetTargetRef()
// invalid
trGk := TargetRefGroupKind(tr)
if !h.kinds.Contains(trGk) {
return fmt.Errorf("%w: not supported GroupKind=%s/%s",
ErrGroupKind, tr.Group, tr.Kind)
}
// not found
targetRefObj, err := h.client.TargetRefObj(ctx, policy)
if err != nil {
if apierrors.IsNotFound(err) {
return fmt.Errorf("%w, target=%s/%s",
ErrTargetRefNotFound, policy.GetNamespace(), tr.Name)
}
return err
}
// conflicted
objPolicies, err := h.ObjPolicies(ctx, targetRefObj)
if err != nil {
return err
}
if len(objPolicies) > 0 {
resolvedPolicy := objPolicies[0]
if resolvedPolicy.GetName() != policy.GetName() {
return fmt.Errorf("%w, policy=%s",
ErrTargetRefConflict, resolvedPolicy.GetName())
}
}
// valid
return nil
}
func errToReason(err error) ConditionReason {
switch {
case err == nil:
return ReasonAccepted
case errors.Is(err, ErrGroupKind):
return ReasonInvalid
case errors.Is(err, ErrTargetRefNotFound):
return ReasonTargetNotFound
case errors.Is(err, ErrTargetRefConflict):
return ReasonConflicted
default:
return ReasonUnknown
}
}
func (h *PolicyHandler[P]) UpdateAcceptedCondition(ctx context.Context, policy P, reason ConditionReason, msg string) error {
status := metav1.ConditionTrue
if reason != ReasonAccepted {
status = metav1.ConditionFalse
}
cnd := metav1.Condition{
Type: string(ConditionTypeAccepted),
Status: status,
ObservedGeneration: policy.GetGeneration(),
Reason: string(reason),
Message: msg,
}
meta.SetStatusCondition(policy.GetStatusConditions(), cnd)
err := h.client.UpdateStatus(ctx, policy)
return err
}
// sort in-place for policy conflict resolution
// 1. older policy (CreationTimeStamp) has precedence
// 2. alphabetical order namespace, then name
func (h *PolicyHandler[P]) conflictResolutionSort(policies []P) {
slices.SortFunc(policies, func(a, b P) int {
tsA := a.GetCreationTimestamp().Time
tsB := b.GetCreationTimestamp().Time
switch {
case tsA.Before(tsB):
return -1
case tsA.After(tsB):
return 1
default:
nA := a.GetName()
nB := b.GetName()
return strings.Compare(nA, nB)
}
})
}