pkg/skoop/service/service.go (179 lines of code) (raw):

package service import ( "context" "fmt" "net" "strings" "github.com/alibaba/kubeskoop/pkg/skoop/utils" "github.com/samber/lo" "k8s.io/utils/pointer" ctx "github.com/alibaba/kubeskoop/pkg/skoop/context" "github.com/alibaba/kubeskoop/pkg/skoop/model" "github.com/alibaba/kubeskoop/pkg/skoop/netstack" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/klog/v2" log "github.com/sirupsen/logrus" "golang.org/x/exp/maps" "golang.org/x/exp/slices" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" ) type Backend struct { IP string Port uint16 Masquerade bool } type Processor interface { //验证backends是否符合预期 Validate(packet model.Packet, backends []Backend, netns netstack.NetNS) error //根据packet和svc,返回正确的backends Process(packet model.Packet, svc *v1.Service, node *v1.Node) []Backend } type KubeProxyServiceProcessor struct { mode string clusterCIDR *net.IPNet client *kubernetes.Clientset } func (k *KubeProxyServiceProcessor) Validate(packet model.Packet, backends []Backend, netns netstack.NetNS) error { if k.mode != "ipvs" { return nil } if netns.IPVS == nil { log.Errorf("ipvs field in netns is nil which is not expected") return nil } ipvsService := netns.IPVS.GetService(packet.Protocol, packet.Dst.String(), packet.Dport) if ipvsService == nil { return fmt.Errorf("service has not been connfigured in ipvs") } backendsSet := make(map[string]bool) for _, backend := range backends { key := fmt.Sprintf("%s:%d", backend.IP, backend.Port) backendsSet[key] = true } var invalids []string for _, rs := range ipvsService.RS { key := fmt.Sprintf("%s:%d", rs.IP, rs.Port) if _, ok := backendsSet[key]; !ok { invalids = append(invalids, key) } delete(backendsSet, key) } if len(invalids) > 0 { return fmt.Errorf("ipvs realserver %s is not a valid k8s service backend, which could make network issues", strings.Join(invalids, ",")) } if len(backendsSet) != 0 { return fmt.Errorf("k8s endpoint %s is not in ipvs realserver, which could make network issues", strings.Join(maps.Keys(backendsSet), ",")) } return nil } func GetTargetPort(svc *v1.Service, dport uint16, protocol model.Protocol) uint16 { for _, port := range svc.Spec.Ports { if port.Port == int32(dport) && strings.EqualFold(string(port.Protocol), string(protocol)) { //TODO 处理named port if port.TargetPort.Type == intstr.String { klog.Warningf("named port not support now for service %q port %q", svc.Name, port.TargetPort.StrVal) } return uint16(port.TargetPort.IntVal) } } return 0 } func GetNodePort(svc *v1.Service, dport uint16, protocol model.Protocol) uint16 { for _, port := range svc.Spec.Ports { if port.Port == int32(dport) && strings.EqualFold(string(port.Protocol), string(protocol)) { //TODO 处理named port if port.TargetPort.Type == intstr.String { klog.Warningf("named port not support now for service %q port %q", svc.Name, port.TargetPort.StrVal) } return uint16(port.NodePort) } } return 0 } func serviceTargetPortByNodePort(svc *v1.Service, nodePort uint16, protocol model.Protocol) uint16 { for _, port := range svc.Spec.Ports { if strings.EqualFold(string(port.Protocol), string(protocol)) && port.NodePort == int32(nodePort) { return uint16(port.TargetPort.IntVal) } } return 0 } func serviceLBIPs(svc *v1.Service) []string { if svc.Spec.Type != "LoadBalancer" { return nil } var ret []string for _, ingress := range svc.Status.LoadBalancer.Ingress { ret = append(ret, ingress.IP) } return ret } func isTrafficLocalService(svc *v1.Service) bool { return svc.Spec.ExternalTrafficPolicy == v1.ServiceExternalTrafficPolicyTypeLocal } func (k *KubeProxyServiceProcessor) shouldMasquerade(packet model.Packet, svc *v1.Service) (bool, uint16) { masquerade := false targetPort := GetTargetPort(svc, packet.Dport, packet.Protocol) dst := packet.Dst.String() if targetPort != 0 && slices.Contains(serviceLBIPs(svc), dst) { masquerade = !isTrafficLocalService(svc) } else if targetPort != 0 && dst == svc.Spec.ClusterIP && k.clusterCIDR != nil { masquerade = !k.clusterCIDR.Contains(packet.Src) } else { targetPortByNodePort := serviceTargetPortByNodePort(svc, packet.Dport, packet.Protocol) if targetPortByNodePort != 0 { targetPort = targetPortByNodePort masquerade = !isTrafficLocalService(svc) } else if slices.Contains(svc.Spec.ExternalIPs, dst) { masquerade = !isTrafficLocalService(svc) } } return masquerade, targetPort } func (k *KubeProxyServiceProcessor) Process(packet model.Packet, svc *v1.Service, node *v1.Node) []Backend { masquerade, targetPort := k.shouldMasquerade(packet, svc) ep, err := k.client.CoreV1().Endpoints(svc.Namespace).Get(context.TODO(), svc.Name, metav1.GetOptions{}) if err != nil { log.Errorf("error list endponts for service") return nil } localBackend := isExternalTraffic(svc, node, packet) && svc.Spec.ExternalTrafficPolicy == v1.ServiceExternalTrafficPolicyTypeLocal var ret []Backend for _, ss := range ep.Subsets { for _, addr := range ss.Addresses { if node != nil && localBackend && pointer.StringDeref(addr.NodeName, "") != node.Name { continue } backend := Backend{ IP: addr.IP, Port: targetPort, Masquerade: masquerade, } ret = append(ret, backend) } } return ret } func isExternalTraffic(svc *v1.Service, node *v1.Node, pkt model.Packet) bool { if utils.ContainsLoadBalancerIP(svc, pkt.Dst.String()) { return true } if node == nil { return false } if !lo.ContainsBy(node.Status.Addresses, func(a v1.NodeAddress) bool { return a.Address == pkt.Dst.String() }) { return false } return lo.ContainsBy(svc.Spec.Ports, func(p v1.ServicePort) bool { return p.NodePort == int32(pkt.Dport) }) } func NewKubeProxyServiceProcessor(ctx *ctx.Context) *KubeProxyServiceProcessor { return &KubeProxyServiceProcessor{ mode: ctx.ClusterConfig().ProxyMode, clusterCIDR: ctx.ClusterConfig().ClusterCIDR, client: ctx.KubernetesClient(), } } var _ Processor = &KubeProxyServiceProcessor{}