pkg/resolvers/endpoints.go (448 lines of code) (raw):
package resolvers
import (
"context"
"fmt"
"strconv"
policyinfo "github.com/aws/amazon-network-policy-controller-k8s/api/v1alpha1"
"github.com/aws/amazon-network-policy-controller-k8s/pkg/k8s"
"github.com/go-logr/logr"
"github.com/pkg/errors"
"golang.org/x/exp/maps"
corev1 "k8s.io/api/core/v1"
networking "k8s.io/api/networking/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"sigs.k8s.io/controller-runtime/pkg/client"
)
type EndpointsResolver interface {
// Resolve returns the resolved endpoints for the given policy ingress, egress rules and pod selector labels.
Resolve(ctx context.Context, policy *networking.NetworkPolicy) ([]policyinfo.EndpointInfo, []policyinfo.EndpointInfo,
[]policyinfo.PodEndpoint, error)
}
// NewEndpointsResolver constructs a new defaultEndpointsResolver
func NewEndpointsResolver(k8sClient client.Client, logger logr.Logger) *defaultEndpointsResolver {
return &defaultEndpointsResolver{
k8sClient: k8sClient,
logger: logger,
}
}
var _ EndpointsResolver = (*defaultEndpointsResolver)(nil)
type defaultEndpointsResolver struct {
k8sClient client.Client
logger logr.Logger
}
func (r *defaultEndpointsResolver) Resolve(ctx context.Context, policy *networking.NetworkPolicy) ([]policyinfo.EndpointInfo,
[]policyinfo.EndpointInfo, []policyinfo.PodEndpoint, error) {
ingressEndpoints, err := r.computeIngressEndpoints(ctx, policy)
if err != nil {
return nil, nil, nil, err
}
egressEndpoints, err := r.computeEgressEndpoints(ctx, policy)
if err != nil {
return nil, nil, nil, err
}
podSelectorEndpoints, err := r.computePodSelectorEndpoints(ctx, policy)
if err != nil {
return nil, nil, nil, err
}
r.logger.Info("Resolved endpoints", "policy", k8s.NamespacedName(policy), "ingress", len(ingressEndpoints), "egress",
len(egressEndpoints), "pod selector endpoints", len(podSelectorEndpoints))
return ingressEndpoints, egressEndpoints, podSelectorEndpoints, nil
}
func (r *defaultEndpointsResolver) computeIngressEndpoints(ctx context.Context, policy *networking.NetworkPolicy) ([]policyinfo.EndpointInfo, error) {
var ingressEndpoints []policyinfo.EndpointInfo
for _, rule := range policy.Spec.Ingress {
r.logger.V(1).Info("computing ingress addresses", "peers", rule.From)
if rule.From == nil {
ingressEndpoints = append(ingressEndpoints, r.getAllowAllNetworkPeers(ctx, policy, rule.Ports, networking.PolicyTypeIngress)...)
continue
}
resolvedPeers, err := r.resolveNetworkPeers(ctx, policy, rule.From, rule.Ports, networking.PolicyTypeIngress)
if err != nil {
return nil, errors.Wrap(err, "unable to resolve ingress network peers")
}
ingressEndpoints = append(ingressEndpoints, resolvedPeers...)
}
r.logger.V(1).Info("Resolved ingress rules", "policy", k8s.NamespacedName(policy), "addresses", ingressEndpoints)
return ingressEndpoints, nil
}
func (r *defaultEndpointsResolver) computeEgressEndpoints(ctx context.Context, policy *networking.NetworkPolicy) ([]policyinfo.EndpointInfo, error) {
var egressEndpoints []policyinfo.EndpointInfo
for _, rule := range policy.Spec.Egress {
r.logger.V(1).Info("computing egress addresses", "peers", rule.To)
if rule.To == nil {
egressEndpoints = append(egressEndpoints, r.getAllowAllNetworkPeers(ctx, policy, rule.Ports, networking.PolicyTypeEgress)...)
continue
}
resolvedPeers, err := r.resolveNetworkPeers(ctx, policy, rule.To, rule.Ports, networking.PolicyTypeEgress)
if err != nil {
return nil, errors.Wrap(err, "unable to resolve egress network peers")
}
resolvedClusterIPs, err := r.resolveServiceClusterIPs(ctx, rule.To, policy.Namespace, rule.Ports)
if err != nil {
return nil, errors.Wrap(err, "unable to resolve service cluster IPs for egress")
}
egressEndpoints = append(egressEndpoints, resolvedPeers...)
egressEndpoints = append(egressEndpoints, resolvedClusterIPs...)
}
r.logger.V(1).Info("Resolved egress rules", "policy", k8s.NamespacedName(policy), "addresses", egressEndpoints)
return egressEndpoints, nil
}
func (r *defaultEndpointsResolver) computePodSelectorEndpoints(ctx context.Context, policy *networking.NetworkPolicy) ([]policyinfo.PodEndpoint, error) {
var podEndpoints []policyinfo.PodEndpoint
podSelector, err := metav1.LabelSelectorAsSelector(&policy.Spec.PodSelector)
if err != nil {
return nil, errors.Wrap(err, "unable to get pod selector")
}
podList := &corev1.PodList{}
if err := r.k8sClient.List(ctx, podList, &client.ListOptions{
LabelSelector: podSelector,
Namespace: policy.Namespace,
}); err != nil {
r.logger.Info("Unable to List Pods", "err", err)
return nil, err
}
for _, pod := range podList.Items {
podIP := k8s.GetPodIP(&pod)
if len(podIP) > 0 {
podEndpoints = append(podEndpoints, policyinfo.PodEndpoint{
PodIP: policyinfo.NetworkAddress(podIP),
HostIP: policyinfo.NetworkAddress(pod.Status.HostIP),
Name: pod.Name,
Namespace: pod.Namespace,
})
}
}
r.logger.V(1).Info("Resolved pod selector endpoints", "policy", k8s.NamespacedName(policy), "pod endpoints", podEndpoints)
return podEndpoints, nil
}
func (r *defaultEndpointsResolver) getAllowAllNetworkPeers(ctx context.Context, policy *networking.NetworkPolicy, ports []networking.NetworkPolicyPort, policyType networking.PolicyType) []policyinfo.EndpointInfo {
var portList []policyinfo.Port
for _, port := range ports {
portInfo := r.convertToPolicyInfoPortForCIDRs(port)
if portInfo != nil {
portList = append(portList, *portInfo)
} else {
if policyType == networking.PolicyTypeIngress {
ports := r.getIngressRulesPorts(ctx, policy.Namespace, &policy.Spec.PodSelector, []networking.NetworkPolicyPort{port})
portList = append(portList, ports...)
}
}
}
if len(ports) != 0 && len(portList) == 0 {
return nil
}
return []policyinfo.EndpointInfo{
{
CIDR: "0.0.0.0/0",
Ports: portList,
},
{
CIDR: "::/0",
Ports: portList,
},
}
}
func (r *defaultEndpointsResolver) resolveNetworkPeers(ctx context.Context, policy *networking.NetworkPolicy,
peers []networking.NetworkPolicyPeer, ports []networking.NetworkPolicyPort, policyType networking.PolicyType) ([]policyinfo.EndpointInfo, error) {
var networkPeers []policyinfo.EndpointInfo
for _, peer := range peers {
if peer.IPBlock != nil {
var except []policyinfo.NetworkAddress
for _, ea := range peer.IPBlock.Except {
except = append(except, policyinfo.NetworkAddress(ea))
}
var portList []policyinfo.Port
for _, port := range ports {
portInfo := r.convertToPolicyInfoPortForCIDRs(port)
if portInfo != nil {
portList = append(portList, *portInfo)
} else {
if policyType == networking.PolicyTypeIngress {
ports := r.getIngressRulesPorts(ctx, policy.Namespace, &policy.Spec.PodSelector, []networking.NetworkPolicyPort{port})
portList = append(portList, ports...)
}
}
}
// A non-empty input port list would imply the user wants to allow traffic only on the specified ports.
// However, in this case we are not able to resolve any of the ports from the CIDR list alone. In this
// case we do not add the CIDR to the list of resolved peers to prevent allow all ports.
if len(ports) != 0 && len(portList) == 0 {
r.logger.Info("Couldn't resolve ports from given CIDR list and will skip this rule", "peer", peer)
continue
}
networkPeers = append(networkPeers, policyinfo.EndpointInfo{
CIDR: policyinfo.NetworkAddress(peer.IPBlock.CIDR),
Except: except,
Ports: portList,
})
continue
}
var namespaces []string
if peer.NamespaceSelector != nil {
var err error
if namespaces, err = r.resolveNamespaces(ctx, peer.NamespaceSelector); err != nil {
return nil, err
}
} else {
namespaces = []string{policy.Namespace}
}
for _, ns := range namespaces {
networkPeers = append(networkPeers, r.getMatchingPodAddresses(ctx, peer.PodSelector, ns, policy, ports, policyType)...)
}
}
return networkPeers, nil
}
func (r *defaultEndpointsResolver) getIngressRulesPorts(ctx context.Context, policyNamespace string, policyPodSelector *metav1.LabelSelector, ports []networking.NetworkPolicyPort) []policyinfo.Port {
podList := &corev1.PodList{}
if err := r.k8sClient.List(ctx, podList, &client.ListOptions{
LabelSelector: r.createPodLabelSelector(policyPodSelector),
Namespace: policyNamespace,
}); err != nil {
r.logger.Info("Unable to List Pods", "err", err)
return nil
}
r.logger.V(2).Info("list pods for ingress", "podList", *podList, "namespace", policyNamespace, "selector", *policyPodSelector)
var portList []policyinfo.Port
for _, pod := range podList.Items {
portList = append(portList, r.getPortList(pod, ports)...)
r.logger.Info("Got ingress port from pod", "pod", types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}.String())
}
// since we pull ports from dst pods, we should deduplicate them
dedupedPorts := dedupPorts(portList)
r.logger.Info("Got ingress ports from dst pods", "port", dedupedPorts)
return dedupedPorts
}
func (r *defaultEndpointsResolver) getPortList(pod corev1.Pod, ports []networking.NetworkPolicyPort) []policyinfo.Port {
var portList []policyinfo.Port
for _, port := range ports {
var portPtr *int32
if port.Port != nil {
portVal, _, err := k8s.LookupContainerPortAndName(&pod, *port.Port, *port.Protocol)
if err != nil {
// Isolate the pod for the port if we are unable to resolve the named port
r.logger.Info("Unable to lookup container port", "pod", k8s.NamespacedName(&pod),
"port", *port.Port, "err", err)
continue
}
portPtr = &portVal
}
portList = append(portList, policyinfo.Port{
Protocol: port.Protocol,
Port: portPtr,
EndPort: port.EndPort,
})
}
return portList
}
func (r *defaultEndpointsResolver) resolveServiceClusterIPs(ctx context.Context, peers []networking.NetworkPolicyPeer, policyNamespace string,
ports []networking.NetworkPolicyPort) ([]policyinfo.EndpointInfo, error) {
var networkPeers []policyinfo.EndpointInfo
for _, peer := range peers {
var namespaces []string
if peer.IPBlock != nil {
continue
}
namespaces = append(namespaces, policyNamespace)
if peer.NamespaceSelector != nil {
var err error
namespaces, err = r.resolveNamespaces(ctx, peer.NamespaceSelector)
if err != nil {
return nil, err
}
}
r.logger.Info("Populated namespaces for service clusterIP lookup", "list", namespaces)
for _, ns := range namespaces {
networkPeers = append(networkPeers, r.getMatchingServiceClusterIPs(ctx, peer.PodSelector, ns, ports)...)
}
}
return networkPeers, nil
}
// convertToPolicyInfoPortForCIDRs converts the NetworkPolicyPort to policyinfo.Port. This is used for CIDR based
// rules where it is not possible to resolve the named ports.
func (r *defaultEndpointsResolver) convertToPolicyInfoPortForCIDRs(port networking.NetworkPolicyPort) *policyinfo.Port {
protocol := *port.Protocol
switch {
case port.Port == nil:
return &policyinfo.Port{
Protocol: &protocol,
}
case port.Port.Type == intstr.String:
return nil
default:
startPort := int32(port.Port.IntValue())
return &policyinfo.Port{
Port: &startPort,
Protocol: &protocol,
EndPort: port.EndPort,
}
}
}
func (r *defaultEndpointsResolver) resolveNamespaces(ctx context.Context, ls *metav1.LabelSelector) ([]string, error) {
var namespaces []string
nsSelector, err := metav1.LabelSelectorAsSelector(ls)
if err != nil {
return nil, errors.Wrap(err, "unable to get namespace selector")
}
nsList := &corev1.NamespaceList{}
if err := r.k8sClient.List(ctx, nsList, &client.ListOptions{
LabelSelector: nsSelector,
}); err != nil {
return nil, errors.Wrap(err, "unable to list namespaces")
}
for _, ns := range nsList.Items {
namespaces = append(namespaces, ns.Name)
}
return namespaces, nil
}
func (r *defaultEndpointsResolver) getMatchingPodAddresses(ctx context.Context, ls *metav1.LabelSelector, namespace string,
policy *networking.NetworkPolicy, rulePorts []networking.NetworkPolicyPort, policyType networking.PolicyType) []policyinfo.EndpointInfo {
var addresses []policyinfo.EndpointInfo
var portList []policyinfo.Port
// populate the policy applied targets' ports
// only populate ports for Ingress and from network policy namespaces as destination ports
if policyType == networking.PolicyTypeIngress {
portList = r.getIngressRulesPorts(ctx, policy.Namespace, &policy.Spec.PodSelector, rulePorts)
if len(rulePorts) != len(portList) && len(portList) == 0 {
r.logger.Info("Couldn't get matched port list from ingress of policy", "policy", types.NamespacedName{Name: policy.Name, Namespace: policy.Namespace}.String(),
"ingressPorts", rulePorts, "derivedPorts", portList)
return nil
}
}
// populate src pods for ingress and dst pods for egress
podList := &corev1.PodList{}
if err := r.k8sClient.List(ctx, podList, &client.ListOptions{
LabelSelector: r.createPodLabelSelector(ls),
Namespace: namespace,
}); err != nil {
r.logger.Info("Unable to List Pods", "err", err)
return nil
}
r.logger.V(1).Info("Got pods for label selector", "count", len(podList.Items), "selector", ls.String())
for _, pod := range podList.Items {
podIP := k8s.GetPodIP(&pod)
if len(podIP) == 0 {
continue
}
if policyType == networking.PolicyTypeEgress {
portList = r.getPortList(pod, rulePorts)
if len(rulePorts) != len(portList) && len(portList) == 0 {
r.logger.Info("Couldn't get matched port list from the pod", "pod", k8s.NamespacedName(&pod), "expectedPorts", rulePorts)
continue
}
}
addresses = append(addresses, policyinfo.EndpointInfo{
CIDR: policyinfo.NetworkAddress(podIP),
Ports: portList,
})
}
return addresses
}
func (r *defaultEndpointsResolver) createPodLabelSelector(ls *metav1.LabelSelector) labels.Selector {
var podSelector labels.Selector
if ls == nil {
podSelector = labels.Everything()
} else {
var err error
if podSelector, err = metav1.LabelSelectorAsSelector(ls); err != nil {
r.logger.Info("Unable to get pod selector", "err", err)
return nil
}
}
return podSelector
}
// getMatchingServiceClusterIPs returns the clusterIPs of the services with service.spec.Selector matching the pod selector
// in the egress rules. This serves as a workaround for the network agent in case of the policy enforcement for egress traffic
// from the pod to the cluster IPs. The network agent limitation arises since it attaches the ebpf probes to the TC hook of the
// pod veth interface and thus unable to see the pod IP after the DNAT happens for the clusterIPs. The current version is limited
// to tracking the services where the service.spec.Selector matches the pod selector in the egress rules.
func (r *defaultEndpointsResolver) getMatchingServiceClusterIPs(ctx context.Context, ls *metav1.LabelSelector, namespace string,
ports []networking.NetworkPolicyPort) []policyinfo.EndpointInfo {
var networkPeers []policyinfo.EndpointInfo
if ls == nil {
ls = &metav1.LabelSelector{}
}
svcSelector, err := metav1.LabelSelectorAsSelector(ls)
if err != nil {
r.logger.Info("Unable to get pod selector", "err", err)
return nil
}
svcList := &corev1.ServiceList{}
if err := r.k8sClient.List(ctx, svcList, &client.ListOptions{
Namespace: namespace,
}); err != nil {
r.logger.Info("Unable to list services", "err", err)
return nil
}
for _, svc := range svcList.Items {
// do not add headless services to policy endpoints
if k8s.IsServiceHeadless(&svc) {
r.logger.Info("skipping headless service when populating EndpointInfo", "serviceName", svc.Name, "serviceNamespace", svc.Namespace)
continue
}
// do not add services if their pod selector is not matching with the pod selector defined in the network policy
if !svcSelector.Matches(labels.Set(svc.Spec.Selector)) {
r.logger.Info("skipping pod selector mismatched service when populating EndpointInfo", "serviceName", svc.Name, "serviceNamespace", svc.Namespace, "expectedPS", svcSelector)
continue
}
var portList []policyinfo.Port
for _, port := range ports {
var portPtr *int32
if port.Port != nil {
portVal, err := r.getMatchingServicePort(ctx, &svc, port.Port, *port.Protocol)
if err != nil {
r.logger.Info("Unable to lookup service port", "err", err)
continue
}
portPtr = &portVal
}
portList = append(portList, policyinfo.Port{
Protocol: port.Protocol,
Port: portPtr,
EndPort: port.EndPort,
})
}
if len(ports) != len(portList) && len(portList) == 0 {
r.logger.Info("Couldn't find matching port for the service", "service", k8s.NamespacedName(&svc))
continue
}
networkPeers = append(networkPeers, policyinfo.EndpointInfo{
CIDR: policyinfo.NetworkAddress(svc.Spec.ClusterIP),
Ports: portList,
})
}
return networkPeers
}
func (r *defaultEndpointsResolver) getMatchingServicePort(ctx context.Context, svc *corev1.Service, port *intstr.IntOrString, protocol corev1.Protocol) (int32, error) {
if port == nil {
return 0, errors.New("unable to lookup service listen port, input port is nil")
}
if portVal, err := k8s.LookupServiceListenPort(svc, *port, protocol); err == nil {
return portVal, nil
} else {
r.logger.Info("Unable to lookup service port", "err", err)
}
// List pods matching the svc selector
podSelector, err := metav1.LabelSelectorAsSelector(metav1.SetAsLabelSelector(svc.Spec.Selector))
if err != nil {
return 0, err
}
podList := &corev1.PodList{}
if err := r.k8sClient.List(ctx, podList, &client.ListOptions{
LabelSelector: podSelector,
Namespace: svc.Namespace,
}); err != nil {
r.logger.Info("Unable to List Pods", "err", err)
return 0, err
}
for i := range podList.Items {
if portVal, err := k8s.LookupListenPortFromPodSpec(svc, &podList.Items[i], *port, protocol); err == nil {
return portVal, nil
} else {
r.logger.V(1).Info("The pod doesn't have port matched", "err", err, "pod", podList.Items[i])
}
}
return 0, errors.Errorf("unable to find matching service listen port %s for service %s", port.String(), k8s.NamespacedName(svc))
}
func dedupPorts(policyPorts []policyinfo.Port) []policyinfo.Port {
ports := make(map[string]policyinfo.Port)
for _, port := range policyPorts {
prot, p, ep := "", "", ""
if port.Protocol != nil {
prot = string(*port.Protocol)
}
if port.Port != nil {
p = strconv.FormatInt(int64(*port.Port), 10)
}
if port.EndPort != nil {
ep = strconv.FormatInt(int64(*port.EndPort), 10)
}
ports[fmt.Sprintf("%s@%s@%s", prot, p, ep)] = port
}
if len(ports) > 0 {
return maps.Values(ports)
}
return nil
}