pkg/skoop/plugin/base.go (152 lines of code) (raw):

package plugin import ( "net" "github.com/alibaba/kubeskoop/pkg/skoop/assertions" "github.com/alibaba/kubeskoop/pkg/skoop/k8s" "github.com/alibaba/kubeskoop/pkg/skoop/model" "github.com/alibaba/kubeskoop/pkg/skoop/netstack" "github.com/samber/lo" "golang.org/x/exp/slices" v1 "k8s.io/api/core/v1" ) type Plugin interface { CreatePod(pod *k8s.Pod) (model.NetNodeAction, error) CreateNode(node *k8s.NodeInfo) (model.NetNodeAction, error) } type transmissionFunc func(pkt *model.Packet, iif string) (model.Transmission, error) type SimplePluginNode interface { ToPod(upstream *model.Link, dst model.Endpoint, protocol model.Protocol, pod *v1.Pod) ([]model.Transmission, error) ToHost(upstream *model.Link, dst model.Endpoint, protocol model.Protocol, node *v1.Node) ([]model.Transmission, error) ToService(upstream *model.Link, dst model.Endpoint, protocol model.Protocol, service *v1.Service) ([]model.Transmission, error) ToExternal(upstream *model.Link, dst model.Endpoint, protocol model.Protocol) ([]model.Transmission, error) Serve(upstream *model.Link, dst model.Endpoint, protocol model.Protocol) ([]model.Transmission, error) } type BasePluginNode struct { *model.NetNode IPCache *k8s.IPCache SimplePluginNode SimplePluginNode } func (b *BasePluginNode) Send(dst model.Endpoint, protocol model.Protocol) (trans []model.Transmission, err error) { ipType, err := b.IPCache.GetIPType(dst.IP) if err != nil { return nil, err } switch ipType { case model.EndpointTypePod: pod, err := b.IPCache.GetPodFromIP(dst.IP) if err != nil { return nil, err } return b.SimplePluginNode.ToPod(nil, dst, protocol, pod) case model.EndpointTypeNode: host, err := b.IPCache.GetNodeFromIP(dst.IP) if err != nil { return nil, err } return b.SimplePluginNode.ToHost(nil, dst, protocol, host) case model.EndpointTypeService, model.EndpointTypeLoadbalancer: svc, err := b.IPCache.GetServiceFromIP(dst.IP) if err != nil { return nil, err } return b.SimplePluginNode.ToService(nil, dst, protocol, svc) default: return b.SimplePluginNode.ToExternal(nil, dst, protocol) } } func (b *BasePluginNode) Receive(upstream *model.Link) (trans []model.Transmission, err error) { upstream.Destination = b.NetNode dstIP := upstream.Packet.Dst.String() dstType, err := b.IPCache.GetIPType(dstIP) if err != nil { return nil, err } dst := model.Endpoint{IP: dstIP, Port: upstream.Packet.Dport, Type: dstType} protocol := upstream.Packet.Protocol switch dstType { case model.EndpointTypePod: pod, err := b.IPCache.GetPodFromIP(dst.IP) if err != nil { return nil, err } return b.SimplePluginNode.ToPod(upstream, dst, protocol, pod) case model.EndpointTypeNode: host, err := b.IPCache.GetNodeFromIP(dst.IP) if err != nil { return nil, err } if host.Name == b.ID { //to myself svc, err := b.IPCache.GetServiceFromNodePort(dst.Port, protocol) if err != nil { return nil, err } if svc != nil { return b.SimplePluginNode.ToService(upstream, dst, protocol, svc) } return b.SimplePluginNode.Serve(upstream, dst, protocol) } return b.SimplePluginNode.ToHost(upstream, dst, protocol, host) case model.EndpointTypeService, model.EndpointTypeLoadbalancer: svc, err := b.IPCache.GetServiceFromIP(dst.IP) if err != nil { return nil, err } return b.SimplePluginNode.ToService(upstream, dst, protocol, svc) default: return b.SimplePluginNode.ToExternal(upstream, dst, protocol) } } var _ model.NetNodeAction = &BasePluginNode{} type route struct { routes map[string]assertions.RouteAssertion } func newRoute(routes map[string]assertions.RouteAssertion) *route { if routes != nil { return &route{routes: routes} } return &route{routes: make(map[string]assertions.RouteAssertion)} } func (r *route) AddRoute(cidr string, dev string, gateway *net.IP, scope netstack.Scope) error { _, _, err := net.ParseCIDR(cidr) if err != nil { return err } r.routes[cidr] = assertions.RouteAssertion{ Dev: &dev, Scope: &scope, Gw: gateway, } return nil } func (r *route) Assert(netAssertion *assertions.NetstackAssertion, pkt *model.Packet) error { cidrs := lo.MapToSlice(r.routes, func(k string, _ assertions.RouteAssertion) *net.IPNet { _, n, _ := net.ParseCIDR(k) return n }) matchedCIDR := smallestMatchingCIDR(pkt.Dst, cidrs) if matchedCIDR == nil { return nil } return netAssertion.AssertRoute(r.routes[matchedCIDR.String()], *pkt, "", "") } func smallestMatchingCIDR(ip net.IP, cidr []*net.IPNet) *net.IPNet { matched := lo.Filter(cidr, func(c *net.IPNet, _ int) bool { return c.Contains(ip) }) if len(matched) == 0 { return nil } slices.SortFunc(matched, func(a, b *net.IPNet) bool { onesA, _ := a.Mask.Size() onesB, _ := b.Mask.Size() return onesA > onesB }) return matched[0] } func ack(pkt *model.Packet) *model.Packet { return &model.Packet{ Src: pkt.Dst, Sport: pkt.Dport, Dst: pkt.Src, Dport: pkt.Sport, Protocol: pkt.Protocol, } }