pkg/webhook/pod_readiness_gate_injector.go (161 lines of code) (raw):
package webhook
import (
"context"
anv1alpha1 "github.com/aws/aws-application-networking-k8s/pkg/apis/applicationnetworking/v1alpha1"
k8sutils "github.com/aws/aws-application-networking-k8s/pkg/k8s"
"github.com/aws/aws-application-networking-k8s/pkg/model/core"
"github.com/aws/aws-application-networking-k8s/pkg/utils/gwlog"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"sigs.k8s.io/controller-runtime/pkg/client"
gwv1 "sigs.k8s.io/gateway-api/apis/v1"
)
const (
PodReadinessGateConditionType = "application-networking.k8s.aws/pod-readiness-gate"
)
func NewPodReadinessGateInjector(k8sClient client.Client, log gwlog.Logger) *PodReadinessGateInjector {
return &PodReadinessGateInjector{
k8sClient: k8sClient,
log: log,
}
}
type PodReadinessGateInjector struct {
k8sClient client.Client
log gwlog.Logger
}
func (m *PodReadinessGateInjector) MutateCreate(ctx context.Context, pod *corev1.Pod) error {
pct := corev1.PodConditionType(PodReadinessGateConditionType)
m.log.Debugf(ctx, "Webhook invoked for pod %s/%s", pod.Namespace, getPodName(pod))
found := false
for _, rg := range pod.Spec.ReadinessGates {
if rg.ConditionType == pct {
found = true
break
}
}
if !found {
requiresGate, err := m.requiresReadinessGate(ctx, pod)
if err != nil {
return err
}
if requiresGate {
pod.Spec.ReadinessGates = append(pod.Spec.ReadinessGates, corev1.PodReadinessGate{
ConditionType: pct,
})
}
}
return nil
}
// checks if the pod requires a readiness gate
// mostly debug logs to reduce noise, intended to be tolerant of most failures
func (m *PodReadinessGateInjector) requiresReadinessGate(ctx context.Context, pod *corev1.Pod) (bool, error) {
// fetch all services in the namespace, see if their selector matches the pod
svcList := &corev1.ServiceList{}
if err := m.k8sClient.List(ctx, svcList, client.InNamespace(pod.Namespace)); err != nil {
return false, errors.Wrap(err, "unable to determine readiness gate requirement")
}
svcMatches := m.servicesForPod(pod, svcList)
if len(svcMatches) == 0 {
m.log.Debugf(ctx, "No services found for pod %s/%s", pod.Namespace, getPodName(pod))
return false, nil
}
// for each route, check if it has a backendRef to one of the services
routes := m.listAllRoutes(ctx)
for _, route := range routes {
if svc := m.isPodUsedByRoute(route, svcMatches); svc != nil {
if m.routeHasLatticeGateway(ctx, route) {
m.log.Debugf(ctx, "Pod %s/%s is used by service %s/%s and route %s/%s", pod.Namespace, getPodName(pod),
svc.Namespace, svc.Name, route.Namespace(), route.Name())
return true, nil
}
}
}
// lastly, check if there's a service export for any of the services
for _, svc := range svcMatches {
svcExport := &anv1alpha1.ServiceExport{}
if err := m.k8sClient.Get(ctx, k8sutils.NamespacedName(svc), svcExport); err != nil {
continue
}
m.log.Debugf(ctx, "Pod %s/%s is used by service %s/%s and service export %s/%s", pod.Namespace, getPodName(pod),
svc.Namespace, svc.Name, svcExport.Namespace, svcExport.Name)
return true, nil
}
m.log.Debugf(ctx, "Pod %s/%s does not require a readiness gate", pod.Namespace, getPodName(pod))
return false, nil
}
func (m *PodReadinessGateInjector) listAllRoutes(ctx context.Context) []core.Route {
// fetch all routes in all namespaces - backendRefs can reference other namespaces
var routes []core.Route
httpRouteList := &gwv1.HTTPRouteList{}
err := m.k8sClient.List(ctx, httpRouteList)
if err != nil {
m.log.Errorf(ctx, "Error fetching HTTPRoutes: %s", err)
}
for _, k8sRoute := range httpRouteList.Items {
routes = append(routes, core.NewHTTPRoute(k8sRoute))
}
grpcRouteList := &gwv1.GRPCRouteList{}
err = m.k8sClient.List(ctx, grpcRouteList)
if err != nil {
m.log.Errorf(ctx, "Error fetching GRPCRoutes: %s", err)
}
for _, k8sRoute := range grpcRouteList.Items {
routes = append(routes, core.NewGRPCRoute(k8sRoute))
}
return routes
}
func getPodName(pod *corev1.Pod) string {
if pod == nil {
return ""
} else if pod.Name == "" {
return pod.GenerateName
} else {
return pod.Name
}
}
// returns a map of services that match the pod labels
func (m *PodReadinessGateInjector) servicesForPod(pod *corev1.Pod, svcList *corev1.ServiceList) map[string]*corev1.Service {
svcMatches := make(map[string]*corev1.Service)
podLabels := labels.Set(pod.Labels)
for _, svc := range svcList.Items {
svcSelector := labels.SelectorFromSet(svc.Spec.Selector)
if svcSelector.Matches(podLabels) {
m.log.Debugf(context.TODO(), "Found service %s/%s that matches pod %s/%s",
svc.Namespace, svc.Name, pod.Namespace, getPodName(pod))
svcMatches[svc.Name] = &svc
}
}
return svcMatches
}
func (m *PodReadinessGateInjector) isPodUsedByRoute(route core.Route, svcMap map[string]*corev1.Service) *corev1.Service {
for _, rule := range route.Spec().Rules() {
for _, backendRef := range rule.BackendRefs() {
// from spec: "When [Group] unspecified or empty string, core API group is inferred."
isGroupEqual := backendRef.Group() == nil || string(*backendRef.Group()) == corev1.GroupName
isKindEqual := backendRef.Kind() != nil && string(*backendRef.Kind()) == "Service"
svc, isNameEqual := svcMap[string(backendRef.Name())]
namespace := route.Namespace()
if backendRef.Namespace() != nil {
namespace = string(*backendRef.Namespace())
}
isNamespaceEqual := svc != nil && namespace == svc.GetNamespace()
if isGroupEqual && isKindEqual && isNameEqual && isNamespaceEqual {
m.log.Debugf(context.TODO(), "Found route %s/%s that matches service %s/%s",
route.Namespace(), route.Name(), svc.Namespace, svc.Name)
return svc
}
}
}
return nil
}
func (m *PodReadinessGateInjector) routeHasLatticeGateway(ctx context.Context, route core.Route) bool {
if len(route.Spec().ParentRefs()) == 0 {
m.log.Debugf(ctx, "Route %s/%s has no parentRefs", route.Namespace(), route.Name())
return false
}
parents, err := k8sutils.FindControlledParents(ctx, m.k8sClient, route)
// If there is at least one parent element and an error exists,
// it is not an error related to the parent controlled by the AWS Gateway API Controller, so return true
if len(parents) > 0 {
gw := parents[0]
m.log.Debugf(ctx, "Gateway %s/%s is a AWS Gateway API Controller", gw.Namespace, gw.Name)
return true
}
if err != nil {
m.log.Debugf(ctx, "Unable to retrieve controlled parents for route %s/%s, %s", route.Namespace(), route.Name(), err)
return false
}
m.log.Debugf(ctx, "Route %s/%s has no controlled AWS Gateway API Controller", route.Namespace(), route.Name())
return false
}