pkg/skoop/plugin/flannel.go (875 lines of code) (raw):

package plugin import ( "context" "fmt" "net" "strings" "golang.org/x/exp/slices" "github.com/spf13/pflag" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/klog/v2" "github.com/alibaba/kubeskoop/pkg/skoop/assertions" ctx "github.com/alibaba/kubeskoop/pkg/skoop/context" "github.com/alibaba/kubeskoop/pkg/skoop/k8s" "github.com/alibaba/kubeskoop/pkg/skoop/model" "github.com/alibaba/kubeskoop/pkg/skoop/netstack" "github.com/alibaba/kubeskoop/pkg/skoop/network" "github.com/alibaba/kubeskoop/pkg/skoop/service" "github.com/alibaba/kubeskoop/pkg/skoop/utils" "github.com/samber/lo" v1 "k8s.io/api/core/v1" ) type FlannelConfig struct { BackendType string Bridge string PodMTU int IPMasq bool Interface string } var ( supportedFlannelBackendType = []string{"host-gw", "vxlan", "alloc"} ) func (f *FlannelConfig) BindFlags(fs *pflag.FlagSet) { fs.StringVarP(&f.BackendType, "flannel-backend-type", "", "", "Backend type for flannel plugin, support host-gw,vxlan,alloc. If not set, it will auto detect from flannel config.") fs.StringVarP(&f.Bridge, "flannel-bridge", "", "cni0", "Bridge name for flannel plugin.") fs.IntVarP(&f.PodMTU, "flannel-pod-mtu", "", 0, "Pod MTU for flannel plugin. If not set, it will auto detect from flannel cni mode (1450 for vxlan, 1500 for others).") fs.BoolVarP(&f.IPMasq, "flannel-ip-masq", "", true, "Should do IP masquerade for flannel plugin.") fs.StringVarP(&f.Interface, "flannel-host-interface", "", "", "Host interface for flannel plugin.") } func (f *FlannelConfig) Validate() error { if f.BackendType != "" && !slices.Contains(supportedFlannelBackendType, f.BackendType) { return fmt.Errorf("unsupported flannel backed type %q, should be %s", f.BackendType, strings.Join(supportedFlannelBackendType, ",")) } return nil } var Flannel = &FlannelConfig{} func init() { ctx.RegisterConfigBinder("Flannel plugin", Flannel) } const ( flannelVxlanInterface = "flannel.1" ) type FlannelBackendType string const ( FlannelBackendTypeHostGW FlannelBackendType = "host-gw" FlannelBackendTypeVxlan FlannelBackendType = "vxlan" FlannelBackendTypeAlloc FlannelBackendType = "alloc" ) type FlannelPluginOptions struct { InfraShim network.InfraShim Bridge string Interface string PodMTU int IPMasq bool ClusterCIDR *net.IPNet CNIMode FlannelBackendType ServiceProcessor service.Processor } func getFlannelConfigMap(ctx *ctx.Context) (*v1.ConfigMap, error) { configMaps := []struct { Namespace string Name string }{ {"kube-flannel", "kube-flannel-cfg"}, {"kube-system", "kube-flannel-cfg"}, } var cm *v1.ConfigMap var err error for _, c := range configMaps { cm, err = ctx.KubernetesClient().CoreV1(). ConfigMaps(c.Namespace).Get(context.TODO(), c.Name, metav1.GetOptions{}) if err != nil && !errors.IsNotFound(err) { return nil, err } if err == nil { break } } return cm, nil } func getFlannelCNIMode(ctx *ctx.Context) (FlannelBackendType, error) { cm, err := getFlannelConfigMap(ctx) if err != nil { return "", err } if cm == nil { klog.V(3).Infof("Can not detect flannel backend type, use \"host-gw\" as default.") return FlannelBackendTypeHostGW, nil } conf := cm.Data["net-conf.json"] if strings.Contains(conf, "vxlan") { return FlannelBackendTypeVxlan, nil } if strings.Contains(conf, "host-gw") { return FlannelBackendTypeHostGW, nil } if strings.Contains(conf, "alloc") { return FlannelBackendTypeAlloc, nil } klog.V(3).Infof("Can not detect flannel backend type or unsupported type, use \"host-gw\" as default.") return FlannelBackendTypeHostGW, nil } type flannelPlugin struct { hostOptions *flannelHostOptions serviceProcessor service.Processor podMTU int infraShim network.InfraShim ipCache *k8s.IPCache } func (f *flannelPlugin) CreatePod(pod *k8s.Pod) (model.NetNodeAction, error) { return newSimpleVEthPod(pod, f.ipCache, f.podMTU, "eth0") } func (f *flannelPlugin) CreateNode(node *k8s.NodeInfo) (model.NetNodeAction, error) { flannelHost, err := newFlannelHost(f.ipCache, node, f.infraShim, f.serviceProcessor, f.hostOptions) if err != nil { return nil, err } return &BasePluginNode{ NetNode: flannelHost.netNode, IPCache: f.ipCache, SimplePluginNode: flannelHost, }, nil } func NewFlannelPluginWithOptions(ctx *ctx.Context, options *FlannelPluginOptions) (Plugin, error) { return &flannelPlugin{ podMTU: options.PodMTU, hostOptions: &flannelHostOptions{ Bridge: options.Bridge, ClusterCIDR: options.ClusterCIDR, Interface: options.Interface, CNIMode: options.CNIMode, IPMasq: options.IPMasq, }, serviceProcessor: options.ServiceProcessor, infraShim: options.InfraShim, ipCache: ctx.ClusterConfig().IPCache, }, nil } func NewFlannelPlugin(ctx *ctx.Context, serviceProcessor service.Processor, infraShim network.InfraShim) (Plugin, error) { options := &FlannelPluginOptions{ InfraShim: infraShim, Bridge: Flannel.Bridge, Interface: Flannel.Interface, PodMTU: Flannel.PodMTU, IPMasq: Flannel.IPMasq, ClusterCIDR: ctx.ClusterConfig().ClusterCIDR, CNIMode: FlannelBackendType(Flannel.BackendType), ServiceProcessor: serviceProcessor, } if options.CNIMode == "" { mode, err := getFlannelCNIMode(ctx) if err != nil { return nil, err } options.CNIMode = mode } if options.PodMTU == 0 { mtu := 1500 if options.CNIMode == FlannelBackendTypeVxlan { mtu = 1450 } options.PodMTU = mtu } return NewFlannelPluginWithOptions(ctx, options) } type flannelNodeInfo struct { Vtep net.IP CIDR *net.IPNet BackendType FlannelBackendType NodeIP net.IP Dev *netstack.Interface Route assertions.RouteAssertion } type flannelRoute struct { *route localNetAssertion *assertions.NetstackAssertion localPodCIDR *net.IPNet clusterCIDR *net.IPNet localVTEP net.IP ipCache *k8s.IPCache nodeInfoCache map[string]*flannelNodeInfo cniMode FlannelBackendType iface string } func newFlannelRoute(parentRoute map[string]assertions.RouteAssertion, localPodCIDR *net.IPNet, clusterCIDR *net.IPNet, localNetAssertion *assertions.NetstackAssertion, localNode *v1.Node, ipCache *k8s.IPCache, cniMode FlannelBackendType, iface string) *flannelRoute { route := &flannelRoute{ route: newRoute(parentRoute), localNetAssertion: localNetAssertion, localPodCIDR: localPodCIDR, clusterCIDR: clusterCIDR, localVTEP: net.ParseIP(localNode.Annotations["flannel.alpha.coreos.com/public-ip"]), ipCache: ipCache, nodeInfoCache: map[string]*flannelNodeInfo{}, cniMode: cniMode, iface: iface, } return route } func (r *flannelRoute) AssertBackend(pkt *model.Packet) error { hostInfo, err := r.getDstInfo(pkt) if err != nil { return err } if hostInfo != nil && hostInfo.Dev != nil { r.localNetAssertion.AssertNetDevice(hostInfo.Dev.Name, netstack.Interface{ State: netstack.LinkUP, MTU: hostInfo.Dev.MTU, }) r.localNetAssertion.AssertSysctls(map[string]string{ fmt.Sprintf("net.ipv4.conf.%s.forwarding", utils.ConvertNICNameInSysctls(hostInfo.Dev.Name)): "1", }, model.SuspicionLevelFatal) if hostInfo.BackendType == "vxlan" { err = r.localNetAssertion.AssertVxlanVtep(hostInfo.Vtep, hostInfo.NodeIP, flannelVxlanInterface) if err != nil { return err } } } return nil } func (r *flannelRoute) Encap(opkt *model.Packet) (*model.Packet, error) { pkt := opkt hostInfo, err := r.getDstInfo(opkt) if err != nil { return nil, err } if hostInfo == nil { return pkt, nil } if hostInfo.BackendType == "vxlan" { pkt = &model.Packet{ Src: r.localVTEP, Sport: pkt.Sport, Dst: hostInfo.NodeIP, Dport: 8472, Protocol: model.UDP, Encap: opkt.DeepCopy(), } } return pkt, nil } func (r *flannelRoute) Decap(opkt *model.Packet) (*model.Packet, error) { if opkt.Encap != nil { if !opkt.Dst.Equal(r.localVTEP) { return nil, fmt.Errorf("encap dst %s not match local vtep %s", opkt.Dst, r.localVTEP) } return opkt.Encap, nil } return opkt, nil } func (r *flannelRoute) getDstInfo(pkt *model.Packet) (*flannelNodeInfo, error) { pod, err := r.ipCache.GetPodFromIP(pkt.Dst.String()) if err != nil { return nil, err } if pod == nil { return nil, nil } return r.getDstNodeInfo(pod.Spec.NodeName) } func (r *flannelRoute) getDstNodeInfo(nodeName string) (*flannelNodeInfo, error) { if info, ok := r.nodeInfoCache[nodeName]; ok { return info, nil } node, err := r.ipCache.GetNodeFromName(nodeName) if err != nil { return nil, err } if node == nil { return nil, fmt.Errorf("node %s not found in k8s nodes", nodeName) } ip, cidr, err := net.ParseCIDR(node.Spec.PodCIDR) if err != nil { return nil, err } nextNodeIP := node.Annotations["flannel.alpha.coreos.com/public-ip"] if nextNodeIP == "" { return nil, fmt.Errorf("node %s does not have flannel public-ip annotation", nodeName) } backendTypeAnnotation, ok := node.Annotations["flannel.alpha.coreos.com/backend-type"] if !ok { return nil, fmt.Errorf("node %s does not have flannel backend-type annotation", nodeName) } var backendType FlannelBackendType var dev *netstack.Interface var vtep net.IP if strings.EqualFold(backendTypeAnnotation, string(FlannelBackendTypeVxlan)) { vtep = ip dev = &netstack.Interface{ Name: flannelVxlanInterface, MTU: 1450, Driver: "vxlan", } backendType = FlannelBackendTypeVxlan } else if strings.EqualFold(backendTypeAnnotation, string(FlannelBackendTypeHostGW)) { vtep = net.ParseIP(nextNodeIP) backendType = FlannelBackendTypeHostGW } else { backendType = r.cniMode } if dev == nil { dev = &netstack.Interface{ Name: r.iface, } } route := assertions.RouteAssertion{} if vtep != nil && !vtep.IsUnspecified() { route.Gw = &vtep } if dev != nil { route.Dev = &dev.Name } info := &flannelNodeInfo{ Vtep: vtep, CIDR: cidr, BackendType: backendType, NodeIP: net.ParseIP(nextNodeIP), Dev: dev, Route: route, } r.nodeInfoCache[nodeName] = info return info, nil } func (r *flannelRoute) Assert(netAssertion *assertions.NetstackAssertion, pkt *model.Packet) error { if (r.localPodCIDR != nil && r.localPodCIDR.Contains(pkt.Dst)) || !r.clusterCIDR.Contains(pkt.Dst) { return r.route.Assert(netAssertion, pkt) } nodeInfo, err := r.getDstInfo(pkt) if err != nil { return err } if nodeInfo == nil { return nil } return netAssertion.AssertRoute(nodeInfo.Route, *pkt, "", "") } type flannelHost struct { nodeInfo *k8s.NodeInfo netNode *model.NetNode ipCache *k8s.IPCache infraShim network.InfraShim serviceProcessor service.Processor bridge string clusterCIDR *net.IPNet podCIDR *net.IPNet iface string cniMode FlannelBackendType ipMasq bool gateway net.IP net *assertions.NetstackAssertion k8s *assertions.KubernetesAssertion route *flannelRoute } type flannelHostOptions struct { Bridge string ClusterCIDR *net.IPNet Interface string CNIMode FlannelBackendType IPMasq bool Gateway net.IP } func newFlannelHost(ipCache *k8s.IPCache, nodeInfo *k8s.NodeInfo, infraShim network.InfraShim, serviceProcessor service.Processor, options *flannelHostOptions) (*flannelHost, error) { //if serviceProcessor == nil { // return nil, fmt.Errorf("service processor cannot be nil") //} k8sNode, err := ipCache.GetNodeFromName(nodeInfo.NodeName) if err != nil { return nil, err } _, podCIDR, err := net.ParseCIDR(k8sNode.Spec.PodCIDR) if err != nil { return nil, err } netNode := model.NewNetNode(nodeInfo.NodeName, model.NetNodeTypeNode) assertion := assertions.NewNetstackAssertion(netNode, &nodeInfo.NetNS) k8sAssertion := assertions.NewKubernetesAssertion(netNode) host := &flannelHost{ netNode: netNode, nodeInfo: nodeInfo, ipCache: ipCache, infraShim: infraShim, podCIDR: podCIDR, net: assertion, k8s: k8sAssertion, bridge: options.Bridge, clusterCIDR: options.ClusterCIDR, iface: options.Interface, cniMode: options.CNIMode, ipMasq: options.IPMasq, gateway: options.Gateway, serviceProcessor: serviceProcessor, } if host.iface == "" { host.iface = netstack.LookupDefaultIfaceName(nodeInfo.NetNSInfo.Interfaces) if host.iface == "" { return nil, fmt.Errorf("cannot lookup default host interface, please manually specify it via --flannel-host-interface") } klog.V(5).Infof("detected host interface %s on node %s", host.iface, host.nodeInfo.NodeName) } err = host.initRoute() if err != nil { return nil, err } err = host.basicCheck() if err != nil { return nil, err } return host, nil } var _ SimplePluginNode = &flannelHost{} func (h *flannelHost) transmissionToPod(pkt *model.Packet, pod *v1.Pod, iif string) (model.Transmission, error) { if !pod.Spec.HostNetwork && pod.Spec.NodeName == h.nodeInfo.NodeName { // check local veth key := fmt.Sprintf("%s/%s", pod.Namespace, pod.Name) if peerNetNS, ok := lo.Find(h.nodeInfo.SubNetNSInfo, func(info netstack.NetNSInfo) bool { return info.Key == key }); ok { h.net.AssertVEthPeerBridge("eth0", &peerNetNS, "cni0") } // send to local pod err := h.checkRoute(pkt) if err != nil { return model.Transmission{}, err } pktOut := pkt.DeepCopy() err = h.masquerade(pktOut) if err != nil { return model.Transmission{}, err } link := &model.Link{ Type: model.LinkVeth, Source: h.netNode, Packet: pktOut, SourceAttribute: model.SimpleLinkAttribute{Interface: h.bridge}, } nextHop := model.Hop{ Type: model.NetNodeTypePod, ID: pkt.Dst.String(), } return model.Transmission{ NextHop: nextHop, Link: link, }, nil } node, err := h.ipCache.GetNodeFromName(pod.Spec.NodeName) if err != nil { return model.Transmission{}, err } return h.transmissionToNode(pkt, node, iif) } func (h *flannelHost) transmissionToNode(pkt *model.Packet, node *v1.Node, _ string) (model.Transmission, error) { pktOut := pkt.DeepCopy() err := h.masquerade(pktOut) if err != nil { return model.Transmission{}, err } err = h.checkRoute(pkt) if err != nil { return model.Transmission{}, err } err = h.route.AssertBackend(pktOut) if err != nil { return model.Transmission{}, err } out, err := h.route.Encap(pktOut) if err != nil { return model.Transmission{}, err } localNode, err := h.ipCache.GetNodeFromName(h.nodeInfo.NodeName) if err != nil { return model.Transmission{}, err } if h.infraShim != nil { suspicions, err := h.infraShim.NodeToNode(localNode, h.iface, node, pktOut) if err != nil { return model.Transmission{}, err } h.netNode.Suspicions = append(h.netNode.Suspicions, suspicions...) } link := &model.Link{ Type: model.LinkInfra, Source: h.netNode, Packet: out, SourceAttribute: model.SimpleLinkAttribute{Interface: h.iface}, } nextHop := model.Hop{ Type: model.NetNodeTypeNode, ID: node.Name, } return model.Transmission{ NextHop: nextHop, Link: link, }, nil } func (h *flannelHost) transmissionToExternal(pkt *model.Packet, _ string) (model.Transmission, error) { pktOut := pkt.DeepCopy() err := h.masquerade(pktOut) if err != nil { return model.Transmission{}, err } err = h.checkRoute(pktOut) if err != nil { return model.Transmission{}, err } node, err := h.ipCache.GetNodeFromName(h.nodeInfo.NodeName) if err != nil { return model.Transmission{}, err } if h.infraShim != nil { suspicions, err := h.infraShim.NodeToExternal(node, h.iface, pktOut) if err != nil { return model.Transmission{}, err } h.netNode.Suspicions = append(h.netNode.Suspicions, suspicions...) } link := &model.Link{ Type: model.LinkInfra, Source: h.netNode, Packet: pktOut, SourceAttribute: model.SimpleLinkAttribute{Interface: h.iface}, } nextHop := model.Hop{ Type: model.NetNodeTypeExternal, ID: pktOut.Dst.String(), } return model.Transmission{ NextHop: nextHop, Link: link, }, nil } func (h *flannelHost) ToPod(upstream *model.Link, dst model.Endpoint, protocol model.Protocol, pod *v1.Pod) ([]model.Transmission, error) { makeLink := func(pkt *model.Packet, iif string) (model.Transmission, error) { return h.transmissionToPod(pkt, pod, iif) } return h.to(upstream, dst, protocol, makeLink) } func (h *flannelHost) ToHost(upstream *model.Link, dst model.Endpoint, protocol model.Protocol, node *v1.Node) ([]model.Transmission, error) { makeLink := func(pkt *model.Packet, iif string) (model.Transmission, error) { return h.transmissionToNode(pkt, node, iif) } return h.to(upstream, dst, protocol, makeLink) } func (h *flannelHost) ToService(upstream *model.Link, dst model.Endpoint, protocol model.Protocol, service *v1.Service) ([]model.Transmission, error) { iif := h.detectIif(upstream) var pkt *model.Packet if upstream != nil { upstream.DestinationAttribute = model.SimpleLinkAttribute{Interface: iif} pkt = upstream.Packet } else { pkt = &model.Packet{ Dst: net.ParseIP(dst.IP), Dport: dst.Port, Protocol: protocol, } src, _, err := h.nodeInfo.Router.RouteSrc(pkt, "", "") if err != nil { if err == netstack.ErrNoRouteToHost { h.netNode.AddSuspicion(model.SuspicionLevelFatal, fmt.Sprintf("no route to host: %v", dst)) return nil, &assertions.CannotBuildTransmissionError{ SrcNode: h.netNode, Err: fmt.Errorf("no route to host: %v", dst), } } return nil, err } pkt.Src = net.ParseIP(src) } if len(service.Spec.LoadBalancerSourceRanges) > 0 { cidrAllow := false for _, sourceRange := range service.Spec.LoadBalancerSourceRanges { _, cidr, err := net.ParseCIDR(sourceRange) if err != nil { klog.Errorf("source range(%s) format invalid, %v", cidr, err) continue } if cidr.Contains(pkt.Src) { cidrAllow = true } } if !cidrAllow { h.netNode.Suspicions = append(h.netNode.Suspicions, model.Suspicion{ Level: model.SuspicionLevelWarning, Message: fmt.Sprintf("service sourcePortRange(%v) not allow source ip(%v) access", service.Spec.LoadBalancerSourceRanges, pkt.Src), }) } } node, err := h.ipCache.GetNodeFromName(h.nodeInfo.NodeName) if err != nil { return nil, err } backends := h.serviceProcessor.Process(*pkt, service, node) if len(backends) == 0 { h.netNode.Suspicions = append(h.netNode.Suspicions, model.Suspicion{ Level: model.SuspicionLevelFatal, Message: fmt.Sprintf("service %s/%s has no valid endpoint", service.Namespace, service.Name), }) h.netNode.DoAction(model.ActionService(upstream, []*model.Link{})) return nil, &assertions.CannotBuildTransmissionError{ SrcNode: h.netNode, Err: fmt.Errorf("service %s/%s has no valid endpoint", service.Namespace, service.Name), } } if err := h.serviceProcessor.Validate(*pkt, backends, h.nodeInfo.NetNS); err != nil { h.netNode.Suspicions = append(h.netNode.Suspicions, model.Suspicion{ Level: model.SuspicionLevelFatal, Message: fmt.Sprintf("validate endpoint of service %s/%s failed: %s", service.Namespace, service.Name, err), }) } var nfAssertion func(pktIn model.Packet, pktOut []model.Packet, iif string) if upstream != nil { nfAssertion = h.net.AssertNetfilterForward } else { nfAssertion = h.net.AssertNetfilterSend } var transmissions []model.Transmission for _, backend := range backends { pktOut := &model.Packet{ Src: pkt.Src, Sport: pkt.Sport, Dst: net.ParseIP(backend.IP), Dport: backend.Port, Protocol: protocol, } if backend.Masquerade { ip, _, err := h.nodeInfo.Router.RouteSrc(pktOut, "", "") if err != nil { return nil, err } pktOut.Src = net.ParseIP(ip) } backendType, err := h.ipCache.GetIPType(backend.IP) if err != nil { return nil, err } switch backendType { case model.EndpointTypePod: pod, err := h.ipCache.GetPodFromIP(backend.IP) if err != nil { return nil, err } if pod == nil { return nil, fmt.Errorf("cannot find pod from ip %s", backend.IP) } transmission, err := h.transmissionToPod(pktOut, pod, iif) if err != nil { return nil, err } transmissions = append(transmissions, transmission) case model.EndpointTypeNode: node, err := h.ipCache.GetNodeFromIP(backend.IP) if err != nil { return nil, err } if node == nil { return nil, fmt.Errorf("cannot find node from ip %s", backend.IP) } transmission, err := h.transmissionToNode(pktOut, node, iif) if err != nil { return nil, err } transmissions = append(transmissions, transmission) default: transmission, err := h.transmissionToExternal(pktOut, iif) if err != nil { return nil, err } transmissions = append(transmissions, transmission) } } links := lo.Map(transmissions, func(t model.Transmission, _ int) *model.Link { return t.Link }) h.netNode.DoAction(model.ActionService(upstream, links)) pktOutList := lo.Map(links, func(l *model.Link, _ int) model.Packet { return *l.Packet }) nfAssertion(*pkt, pktOutList, iif) return transmissions, nil } func (h *flannelHost) ToExternal(upstream *model.Link, dst model.Endpoint, protocol model.Protocol) ([]model.Transmission, error) { makeLink := func(pkt *model.Packet, iif string) (model.Transmission, error) { return h.transmissionToExternal(pkt, iif) } return h.to(upstream, dst, protocol, makeLink) } func (h *flannelHost) to(upstream *model.Link, dst model.Endpoint, protocol model.Protocol, transmit transmissionFunc) ([]model.Transmission, error) { iif := h.detectIif(upstream) var action *model.Action var transmission model.Transmission var pkt *model.Packet var nfAssertionFunc func(pktIn model.Packet, pktOut []model.Packet, iif string) if upstream != nil { if upstream.Type == model.LinkVeth { // send from local pod, assert veth if attr, ok := upstream.SourceAttribute.(model.VEthLinkAttribute); ok { h.net.AssertVEthOnBridge(attr.PeerIndex, h.bridge) } } upstream.DestinationAttribute = model.SimpleLinkAttribute{Interface: iif} var err error pkt, err = h.route.Decap(upstream.Packet) if err != nil { return nil, err } transmission, err = transmit(pkt, iif) if err != nil { return nil, err } if transmission.Link == nil { return nil, nil } nfAssertionFunc = h.net.AssertNetfilterForward action = model.ActionForward(upstream, []*model.Link{transmission.Link}) } else { pkt = &model.Packet{ Dst: net.ParseIP(dst.IP), Dport: dst.Port, Protocol: protocol, } addr, _, err := h.nodeInfo.Router.RouteSrc(pkt, "", "") if err != nil { return nil, err } pkt.Src = net.ParseIP(addr) transmission, err = transmit(pkt, iif) if err != nil { return nil, err } if transmission.Link == nil { return nil, nil } nfAssertionFunc = h.net.AssertNetfilterSend action = model.ActionSend([]*model.Link{transmission.Link}) } pktOut := transmission.Link.Packet nfAssertionFunc(*pkt, []model.Packet{*pktOut}, iif) h.netNode.DoAction(action) return []model.Transmission{transmission}, nil } func (h *flannelHost) Serve(upstream *model.Link, dst model.Endpoint, protocol model.Protocol) ([]model.Transmission, error) { iif := h.detectIif(upstream) if upstream.Packet.Encap != nil { innerPacket, err := h.route.Decap(upstream.Packet) if err != nil { return nil, err } pod, err := h.ipCache.GetPodFromIP(innerPacket.Dst.String()) if err != nil { return nil, err } if pod == nil { return nil, fmt.Errorf("inner packet pod %s not found", innerPacket.Dst) } return h.ToPod(upstream, model.Endpoint{ IP: innerPacket.Dst.String(), Type: model.EndpointTypePod, Port: innerPacket.Dport, }, innerPacket.Protocol, pod) } upstream.DestinationAttribute = model.SimpleLinkAttribute{Interface: iif} err := h.route.Assert(h.net, ack(upstream.Packet)) if err != nil { return nil, err } h.net.AssertNetfilterServe(*upstream.Packet, iif) h.net.AssertListen(net.ParseIP(dst.IP), dst.Port, protocol) h.netNode.DoAction(model.ActionServe(upstream)) return nil, nil } func (h *flannelHost) detectIif(upstream *model.Link) string { if upstream == nil { return "" } if upstream.Type == model.LinkVeth { return h.bridge } return h.iface } func (h *flannelHost) basicCheck() error { node, err := h.ipCache.GetNodeFromName(h.nodeInfo.NodeName) if err != nil { return err } h.k8s.AssertNode(node) h.net.AssertDefaultRule() h.net.AssertNoPolicyRoute() h.net.AssertDefaultAccept() h.net.AssertNetDevice(h.iface, netstack.Interface{ MTU: 1500, State: netstack.LinkUP, }) h.net.AssertHostBridge(h.bridge) h.net.AssertSysctls(map[string]string{ "net.bridge.bridge-nf-call-iptables": "1", "net.ipv4.ip_forward": "1", fmt.Sprintf("net.ipv4.conf.%s.forwarding", utils.ConvertNICNameInSysctls(h.bridge)): "1", fmt.Sprintf("net.ipv4.conf.%s.forwarding", utils.ConvertNICNameInSysctls(h.iface)): "1", }, model.SuspicionLevelFatal) return nil } func (h *flannelHost) initRoute() error { interfaceDev, ok := lo.Find(h.nodeInfo.NetNS.Interfaces, func(i netstack.Interface) bool { return i.Name == h.iface }) if !ok { return fmt.Errorf("can not find interface named %s", h.iface) } ip, mask := netstack.GetDefaultIPv4(&interfaceDev) cidr := net.IPNet{ IP: ip, Mask: mask, } routes := map[string]assertions.RouteAssertion{ h.podCIDR.String(): {Dev: &h.bridge, Scope: utils.ToPointer(netstack.ScopeLink)}, cidr.String(): {Dev: &h.iface, Scope: utils.ToPointer(netstack.ScopeLink)}, } if h.gateway != nil && !h.gateway.IsUnspecified() { routes["0.0.0.0/0"] = assertions.RouteAssertion{ Dev: &h.iface, Scope: utils.ToPointer(netstack.ScopeUniverse), Gw: &h.gateway, } } node, err := h.ipCache.GetNodeFromName(h.nodeInfo.NodeName) if err != nil { return err } h.route = newFlannelRoute(routes, h.podCIDR, h.clusterCIDR, h.net, node, h.ipCache, h.cniMode, h.iface) return nil } func (h *flannelHost) masquerade(pkt *model.Packet) error { //flannel masquerade rule // //if src match cluster cidr and dst match cluster cidr: //do nothing // //if src match cluster cidr and dst not match 224.0.0.0/4(multicast): //masquerade // //if src not match cluster cidr and dst match pod cidr (on current node): //do nothing // //if src not match cluster cidr and dst match cluster cidr: //masquerade if !h.ipMasq { return nil } if h.clusterCIDR.Contains(pkt.Src) && h.clusterCIDR.Contains(pkt.Dst) { return nil } _, cidr, _ := net.ParseCIDR("224.0.0.0/4") if h.clusterCIDR.Contains(pkt.Src) && !cidr.Contains(pkt.Dst) { return h.doMasquerade(pkt) } if !h.clusterCIDR.Contains(pkt.Src) && h.podCIDR.Contains(pkt.Dst) { return nil } if !h.clusterCIDR.Contains(pkt.Src) && h.clusterCIDR.Contains(pkt.Dst) { return h.doMasquerade(pkt) } return nil } func (h *flannelHost) doMasquerade(pkt *model.Packet) error { ip, _, err := h.nodeInfo.Router.RouteSrc(pkt, "", "") if err != nil { return err } pkt.Src = net.ParseIP(ip) return nil } func (h *flannelHost) checkRoute(pkt *model.Packet) error { err := h.route.Assert(h.net, pkt) if err != nil { return err } err = h.route.Assert(h.net, ack(pkt)) if err != nil { return err } return nil }