pkg/skoop/plugin/networkpolicy.go (354 lines of code) (raw):
package plugin
import (
"context"
"fmt"
"net"
"github.com/alibaba/kubeskoop/pkg/skoop/k8s"
model "github.com/alibaba/kubeskoop/pkg/skoop/model"
"github.com/alibaba/kubeskoop/pkg/skoop/service"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/networking/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
clientset "k8s.io/client-go/kubernetes"
)
type NetworkPolicyHandler interface {
CheckNetworkPolicy(src, dst model.Endpoint, protocol model.Protocol) ([]model.Suspicion, error)
}
type networkPolicy struct {
// serviceAddrSkipCidrRule
// skip check service address in egress
// e.g. Calico
serviceAddrSkipCidrRule bool
// inClusterAddrEmitCidrRule
// if true: then in cluster resource, if label not match, reject it even ip in cidr block
// e.g. Cilium
inClusterAddrEmitCidrRule bool
policies []v1.NetworkPolicy
ipCache *k8s.IPCache
k8sCli *clientset.Clientset
service service.Processor
}
func NewNetworkPolicy(serviceAddrSkipCidrRule bool, inClusterAddrEmitCidrRule bool, ipCache *k8s.IPCache, k8sCli *clientset.Clientset, service service.Processor) (NetworkPolicyHandler, error) {
np := &networkPolicy{serviceAddrSkipCidrRule: serviceAddrSkipCidrRule, inClusterAddrEmitCidrRule: inClusterAddrEmitCidrRule, ipCache: ipCache, k8sCli: k8sCli, service: service}
policyList, err := np.k8sCli.NetworkingV1().NetworkPolicies("").List(context.TODO(), metav1.ListOptions{})
if err != nil {
return nil, err
}
np.policies = policyList.Items
return np, nil
}
func (np *networkPolicy) CheckNetworkPolicy(src, dst model.Endpoint, protocol model.Protocol) ([]model.Suspicion, error) {
var denies []*v1.NetworkPolicy
var ret []model.Suspicion
if src.Type == model.EndpointTypePod {
pod, err := np.ipCache.GetPodFromIP(src.IP)
if err != nil {
return nil, fmt.Errorf("error get pod from ip ipCache: %v", err)
}
if dst.Type != model.EndpointTypeService && dst.Type != model.EndpointTypeLoadbalancer {
nps, err := np.checkEgress(pod, dst, protocol)
if err != nil {
return nil, fmt.Errorf("error check egress policies: %v", err)
}
denies = append(denies, nps...)
} else if !np.serviceAddrSkipCidrRule {
// pod -> svc
nps, err := np.checkEgress(pod, dst, protocol)
if err != nil {
return nil, fmt.Errorf("error check egress policies: %v", err)
}
denies = append(denies, nps...)
}
}
if dst.Type == model.EndpointTypePod {
pod, err := np.ipCache.GetPodFromIP(dst.IP)
if err != nil {
return nil, fmt.Errorf("error get pod from ip ipCache: %v", err)
}
nps, err := np.checkIngress(pod, src, protocol)
if err != nil {
return nil, fmt.Errorf("error check ingress policies: %v", err)
}
denies = append(denies, nps...)
}
if dst.Type == model.EndpointTypeService || dst.Type == model.EndpointTypeLoadbalancer {
svc, err := np.ipCache.GetServiceFromIP(dst.IP)
if err != nil {
return nil, fmt.Errorf("error get service(%v) from ip ipCache: %v", dst.IP, err)
}
backends := np.service.Process(model.Packet{
Src: net.ParseIP(src.IP),
Sport: src.Port,
Dst: net.ParseIP(dst.IP),
Dport: dst.Port,
Protocol: protocol,
}, svc, nil)
for _, backend := range backends {
if backend.IP == dst.IP {
return nil, fmt.Errorf("service network loop")
}
backendType, err := np.ipCache.GetIPType(backend.IP)
if err != nil {
return nil, err
}
dst := model.Endpoint{
IP: backend.IP,
Type: backendType,
Port: backend.Port,
}
sub, err := np.CheckNetworkPolicy(src, dst, protocol)
if err != nil {
return ret, err
}
ret = append(ret, sub...)
}
}
for _, np := range denies {
ret = append(ret, model.Suspicion{Level: model.SuspicionLevelCritical, Message: fmt.Sprintf("network policy %v/%v deny the packet from %v to(%v) %v:%v",
np.Namespace, np.Name,
src.IP, protocol, dst.IP, dst.Port),
})
}
return ret, nil
}
func (np *networkPolicy) checkEgress(pod *corev1.Pod, dst model.Endpoint, protocol model.Protocol) ([]*v1.NetworkPolicy, error) {
var denies []*v1.NetworkPolicy
for _, policy := range np.policies {
match, err := np.policyMatchPod(&policy, pod)
if err != nil {
return nil, err
}
if match {
dstPodOrNil, err := np.ipCache.GetPodFromIP(dst.IP)
if err != nil {
return nil, err
}
deny, err := np.checkEgressPolicyVerdict(&policy, dstPodOrNil, dst, protocol)
if err != nil {
return nil, err
} else if deny {
denies = append(denies, &policy)
}
}
}
return denies, nil
}
func toV1Protocol(p model.Protocol) corev1.Protocol {
switch p {
case model.TCP:
return corev1.ProtocolTCP
case model.UDP:
return corev1.ProtocolUDP
default:
return corev1.ProtocolTCP
}
}
func (np *networkPolicy) containsPortWithProtocol(port uint16, protocol model.Protocol, ports []v1.NetworkPolicyPort) bool {
// no port means no limit
if len(ports) == 0 {
return true
}
// todo: support named port
for _, p := range ports {
if p.Port == nil {
continue
}
policyProtocol := corev1.ProtocolTCP
// If not specified, this field defaults to TCP.
if p.Protocol != nil {
policyProtocol = *p.Protocol
}
if policyProtocol != toV1Protocol(protocol) {
continue
}
from := p.Port.IntVal
if from <= 0 {
continue
}
to := from
if p.EndPort != nil {
to = *p.EndPort
}
if port >= uint16(from) && port <= uint16(to) {
return true
}
}
return false
}
func (np *networkPolicy) checkEgressPolicyVerdict(policy *v1.NetworkPolicy, dstPod *corev1.Pod, dst model.Endpoint, protocol model.Protocol) (deny bool, err error) {
if !np.hasPolicyType(policy, v1.PolicyTypeEgress) {
return false, nil
}
for _, egressRule := range policy.Spec.Egress {
if !np.containsPortWithProtocol(dst.Port, protocol, egressRule.Ports) {
continue
}
for _, to := range egressRule.To {
if dstPod != nil {
if to.PodSelector != nil {
if dstPod.GetNamespace() != policy.GetNamespace() {
continue
}
selector, err := metav1.LabelSelectorAsSelector(to.PodSelector)
if err != nil {
return false, err
}
if selector.Empty() || selector.Matches(labels.Set(dstPod.Labels)) {
return false, nil
}
} else if to.NamespaceSelector != nil {
selector, err := metav1.LabelSelectorAsSelector(to.NamespaceSelector)
if err != nil {
return false, err
}
if selector.Empty() {
return false, nil
}
dstNamespace, err := np.k8sCli.CoreV1().Namespaces().Get(context.Background(), dstPod.GetNamespace(), metav1.GetOptions{})
if err != nil {
return false, err
}
if selector.Matches(labels.Set(dstNamespace.Labels)) {
return false, nil
}
}
}
if to.IPBlock != nil {
if dstPod != nil && np.inClusterAddrEmitCidrRule {
continue
}
contains, err := np.strCidrContainsIP(to.IPBlock.CIDR, dst.IP)
if err != nil {
return false, err
}
if contains {
var ipExcept bool
for _, exceptCIDR := range to.IPBlock.Except {
except, err := np.strCidrContainsIP(exceptCIDR, dst.IP)
if err != nil {
return false, err
}
if except {
ipExcept = true
break
}
}
if !ipExcept {
return false, nil
}
}
}
}
}
return true, nil
}
func (np *networkPolicy) strCidrContainsIP(cidr, ip string) (bool, error) {
_, subnet, err := net.ParseCIDR(cidr)
if err != nil {
return false, err
}
return subnet.Contains(net.ParseIP(ip)), nil
}
func (np *networkPolicy) hasPolicyType(policy *v1.NetworkPolicy, pt v1.PolicyType) bool {
for _, p := range policy.Spec.PolicyTypes {
if p == pt {
return true
}
}
return false
}
func (np *networkPolicy) policyMatchPod(policy *v1.NetworkPolicy, pod *corev1.Pod) (bool, error) {
if policy.GetNamespace() != pod.GetNamespace() {
return false, nil
}
selector, err := metav1.LabelSelectorAsSelector(&policy.Spec.PodSelector)
if err != nil {
return false, err
}
if !selector.Empty() && !selector.Matches(labels.Set(pod.Labels)) {
return false, nil
}
return true, nil
}
func (np *networkPolicy) checkIngress(pod *corev1.Pod, src model.Endpoint, protocol model.Protocol) ([]*v1.NetworkPolicy, error) {
var denies []*v1.NetworkPolicy
for _, policy := range np.policies {
match, err := np.policyMatchPod(&policy, pod)
if err != nil {
return nil, err
}
if match {
srcPodOrNil, err := np.ipCache.GetPodFromIP(src.IP)
if err != nil {
return nil, fmt.Errorf("error get src pod(%v) from ip cache: %v", src.IP, err)
}
deny, err := np.checkIngressPolicyVerdict(&policy, srcPodOrNil, src, protocol)
if err != nil {
return nil, err
}
if deny {
denies = append(denies, &policy)
}
}
}
return denies, nil
}
func (np *networkPolicy) checkIngressPolicyVerdict(policy *v1.NetworkPolicy, srcPod *corev1.Pod, src model.Endpoint, protocol model.Protocol) (deny bool, err error) {
if !np.hasPolicyType(policy, v1.PolicyTypeIngress) {
return false, nil
}
for _, ingressRule := range policy.Spec.Ingress {
if !np.containsPortWithProtocol(src.Port, protocol, ingressRule.Ports) {
continue
}
for _, from := range ingressRule.From {
if srcPod != nil {
if from.PodSelector != nil {
if srcPod.GetNamespace() != policy.GetNamespace() {
continue
}
selector, err := metav1.LabelSelectorAsSelector(from.PodSelector)
if err != nil {
return false, err
}
if selector.Empty() || selector.Matches(labels.Set(srcPod.Labels)) {
return false, nil
}
} else if from.NamespaceSelector != nil {
selector, err := metav1.LabelSelectorAsSelector(from.NamespaceSelector)
if err != nil {
return false, err
}
if selector.Empty() {
return false, nil
}
srcNamespace, err := np.k8sCli.CoreV1().Namespaces().Get(context.Background(), srcPod.GetName(), metav1.GetOptions{})
if err != nil {
return false, err
}
if selector.Matches(labels.Set(srcNamespace.Labels)) {
return false, nil
}
}
}
if from.IPBlock != nil {
if srcPod != nil && np.inClusterAddrEmitCidrRule {
continue
}
contains, err := np.strCidrContainsIP(from.IPBlock.CIDR, src.IP)
if err != nil {
return false, err
}
if !contains {
continue
}
var ipExcept bool
for _, exceptCIDR := range from.IPBlock.Except {
except, err := np.strCidrContainsIP(exceptCIDR, src.IP)
if err != nil {
return false, err
}
if except {
ipExcept = true
break
}
}
if !ipExcept {
return false, nil
}
}
}
}
return true, nil
}