pkg/skoop/plugin/calico.go (805 lines of code) (raw):

package plugin import ( "context" "crypto/sha1" "encoding/hex" "encoding/json" "fmt" "net" "strings" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/dynamic" "k8s.io/klog/v2" "github.com/spf13/pflag" "github.com/alibaba/kubeskoop/pkg/skoop/assertions" ctx "github.com/alibaba/kubeskoop/pkg/skoop/context" "github.com/alibaba/kubeskoop/pkg/skoop/k8s" "github.com/alibaba/kubeskoop/pkg/skoop/model" "github.com/alibaba/kubeskoop/pkg/skoop/netstack" "github.com/alibaba/kubeskoop/pkg/skoop/network" "github.com/alibaba/kubeskoop/pkg/skoop/service" "github.com/alibaba/kubeskoop/pkg/skoop/utils" calicov3 "github.com/projectcalico/api/pkg/apis/projectcalico/v3" "github.com/projectcalico/api/pkg/client/clientset_generated/clientset" "github.com/samber/lo" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) type CalicoConfig struct { HostMTU int PodMTU int IPIPPodMTU int Interface string } func (c *CalicoConfig) BindFlags(fs *pflag.FlagSet) { fs.StringVarP(&c.Interface, "calico-host-interface", "", "", "Host interface for calico plugin.") fs.IntVarP(&c.HostMTU, "calico-host-mtu", "", 1500, "Host MTU for calico plugin. Host interface MTU in BGP mode.") fs.IntVarP(&c.PodMTU, "calico-pod-mtu", "", 1500, "Pod MTU for calico plugin. Pod interface MTU in BGP mode.") fs.IntVarP(&c.IPIPPodMTU, "calico-ipip-pod-mtu", "", 1480, "Pod MTU for calico plugin. Pod interface MTU in IPIP mode.") } func (c *CalicoConfig) Validate() error { return nil } var Calico = &CalicoConfig{} func init() { ctx.RegisterConfigBinder("Calico plugin", Calico) } type CalicoNetworkMode string const ( CalicoNetworkModelBGP CalicoNetworkMode = "BGP" CalicoNetworkModeIPIP CalicoNetworkMode = "IPIP" CalicoNetworkModeVXLan CalicoNetworkMode = "VXLan" CalicoTunnelInterface = "tunl0" ) type CalicoPluginOptions struct { InfraShim network.InfraShim HostMTU int PodMTU int IPIPPodMTU int ServiceProcessor service.Processor Interface string } type calicoPlugin struct { serviceProcessor service.Processor podMTU int ipipPodMTU int infraShim network.InfraShim ipCache *k8s.IPCache hostOptions *calicoHostOptions } const ( CalicoDefaultInterfacePrefix = "cali" ) func calicoVethName(namespace, name string) string { h := sha1.New() h.Write([]byte(fmt.Sprintf("%s.%s", namespace, name))) return fmt.Sprintf("%s%s", CalicoDefaultInterfacePrefix, hex.EncodeToString(h.Sum(nil))[:11]) } func getIPPool(ipPools []calicov3.IPPool, ip net.IP) *calicov3.IPPool { if ip == nil { return nil } matchedPool, ok := lo.Find(ipPools, func(pool calicov3.IPPool) bool { _, ipNet, _ := net.ParseCIDR(pool.Spec.CIDR) return ipNet.Contains(ip) }) if !ok { return nil } return &matchedPool } func listIPPools(ctx *ctx.Context) ([]calicov3.IPPool, error) { client, err := clientset.NewForConfig(ctx.KubernetesRestClient()) if err != nil { return nil, err } ippools, err := client.ProjectcalicoV3().IPPools().List(context.TODO(), metav1.ListOptions{}) if err == nil { return ippools.Items, nil } klog.V(5).Infof("not able to list projectcalico.org/v3 ippools, error %s, fallback to list crd.", err) dynClient, err := dynamic.NewForConfig(ctx.KubernetesRestClient()) if err != nil { return nil, err } gvr := schema.GroupVersionResource{ Group: "crd.projectcalico.org", Version: "v1", Resource: "ippools", } list, err := dynClient.Resource(gvr).List(context.TODO(), metav1.ListOptions{}) if err != nil { return nil, err } str, err := list.MarshalJSON() if err != nil { return nil, err } var ippoolList calicov3.IPPoolList err = json.Unmarshal(str, &ippoolList) if err != nil { return nil, err } return ippools.Items, err } func (c *calicoPlugin) CreatePod(pod *k8s.Pod) (model.NetNodeAction, error) { mtu := c.podMTU k8sPod, err := c.ipCache.GetPodFromName(pod.Namespace, pod.PodName) if err != nil { return nil, err } if k8sPod == nil { return nil, fmt.Errorf("cannot find pod %s/%s", pod.Namespace, pod.PodName) } pool := getIPPool(c.hostOptions.IPPools, net.ParseIP(k8sPod.Status.PodIP)) if pool != nil { if pool.Spec.IPIPMode != calicov3.IPIPModeNever { mtu = c.ipipPodMTU } } return newSimpleVEthPod(pod, c.ipCache, mtu, "eth0") } func (c *calicoPlugin) CreateNode(node *k8s.NodeInfo) (model.NetNodeAction, error) { calicoHost, err := newCalicoHost(c.ipCache, node, c.infraShim, c.serviceProcessor, c.hostOptions) if err != nil { return nil, err } return &BasePluginNode{ NetNode: calicoHost.netNode, IPCache: c.ipCache, SimplePluginNode: calicoHost, }, nil } type calicoRoute struct { *route iface string ipCache *k8s.IPCache nodeName string ipPools []calicov3.IPPool localNet *assertions.NetstackAssertion network *net.IPNet } func newCalicoRoute(parentRoute map[string]assertions.RouteAssertion, ipCache *k8s.IPCache, iface string, nodeName string, ipPools []calicov3.IPPool, localNetAssertion *assertions.NetstackAssertion, network *net.IPNet) *calicoRoute { route := &calicoRoute{ route: newRoute(parentRoute), iface: iface, ipCache: ipCache, nodeName: nodeName, ipPools: ipPools, localNet: localNetAssertion, network: network, } return route } func (r *calicoRoute) getNetworkModeFromIPPool(pool *calicov3.IPPool, dstNodeIP net.IP) CalicoNetworkMode { if pool == nil { return CalicoNetworkModelBGP } switch pool.Spec.IPIPMode { case "Always": return CalicoNetworkModeIPIP case "CrossSubnet": if r.network.Contains(dstNodeIP) { return CalicoNetworkModelBGP } return CalicoNetworkModeIPIP case "Never": return CalicoNetworkModelBGP default: return CalicoNetworkModelBGP } } func (r *calicoRoute) AssertLocalPodRoute(pkt *model.Packet, dstPod *v1.Pod) error { vethName := calicoVethName(dstPod.Namespace, dstPod.Name) assertion := assertions.RouteAssertion{ Dev: &vethName, Type: utils.ToPointer(netstack.RtnUnicast), Scope: utils.ToPointer(netstack.ScopeLink), } return r.localNet.AssertRoute(assertion, *pkt, "", "") } func (r *calicoRoute) AssertRemoteRoute(pkt *model.Packet, networkMode CalicoNetworkMode, dstPod *v1.Pod) error { assertion := assertions.RouteAssertion{ Scope: utils.ToPointer(netstack.ScopeUniverse), } if dstPod != nil { hostIP := net.ParseIP(dstPod.Status.HostIP) assertion.Gw = &hostIP } switch networkMode { case CalicoNetworkModelBGP: assertion.Dev = &r.iface case CalicoNetworkModeIPIP: assertion.Dev = utils.ToPointer(CalicoTunnelInterface) assertion.Protocol = utils.ToPointer(netstack.RTProtBIRD) } return r.localNet.AssertRoute(assertion, *pkt, "", "") } func (r *calicoRoute) Assert(pkt *model.Packet) error { ipPool := getIPPool(r.ipPools, pkt.Dst) if ipPool == nil { return r.route.Assert(r.localNet, pkt) } var hostIP net.IP pod, err := r.ipCache.GetPodFromIP(pkt.Dst.String()) if err != nil { return err } if pod != nil { if pod.Spec.NodeName == r.nodeName { return r.AssertLocalPodRoute(pkt, pod) } hostIP = net.ParseIP(pod.Status.HostIP) } networkMode := r.getNetworkModeFromIPPool(ipPool, hostIP) return r.AssertRemoteRoute(pkt, networkMode, pod) } func NewCalicoPluginWithOptions(ctx *ctx.Context, options *CalicoPluginOptions) (Plugin, error) { if options.ServiceProcessor == nil { return nil, fmt.Errorf("service processor must be provided") } ippools, err := listIPPools(ctx) if err != nil { return nil, err } return &calicoPlugin{ infraShim: options.InfraShim, podMTU: options.PodMTU, ipipPodMTU: options.IPIPPodMTU, ipCache: ctx.ClusterConfig().IPCache, serviceProcessor: options.ServiceProcessor, hostOptions: &calicoHostOptions{ Interface: options.Interface, IPPools: ippools, MTU: options.HostMTU, }, }, nil } func NewCalicoPlugin(ctx *ctx.Context, serviceProcessor service.Processor, infraShim network.InfraShim) (Plugin, error) { options := &CalicoPluginOptions{ InfraShim: infraShim, PodMTU: Calico.PodMTU, HostMTU: Calico.HostMTU, IPIPPodMTU: Calico.IPIPPodMTU, ServiceProcessor: serviceProcessor, Interface: Calico.Interface, } return NewCalicoPluginWithOptions(ctx, options) } type calicoHostOptions struct { Interface string MTU int Gateway net.IP IPPools []calicov3.IPPool } type calicoHost struct { netNode *model.NetNode nodeInfo *k8s.NodeInfo iface string mtu int ipCache *k8s.IPCache infraShim network.InfraShim serviceProcessor service.Processor network *net.IPNet gateway net.IP ipPools []calicov3.IPPool net *assertions.NetstackAssertion k8s *assertions.KubernetesAssertion route *calicoRoute } func newCalicoHost(ipCache *k8s.IPCache, nodeInfo *k8s.NodeInfo, infraShim network.InfraShim, serviceProcessor service.Processor, options *calicoHostOptions) (*calicoHost, error) { netNode := model.NewNetNode(nodeInfo.NodeName, model.NetNodeTypeNode) assertion := assertions.NewNetstackAssertion(netNode, &nodeInfo.NetNS) k8sAssertion := assertions.NewKubernetesAssertion(netNode) host := &calicoHost{ netNode: netNode, nodeInfo: nodeInfo, iface: options.Interface, mtu: options.MTU, ipCache: ipCache, serviceProcessor: serviceProcessor, gateway: options.Gateway, ipPools: options.IPPools, net: assertion, k8s: k8sAssertion, infraShim: infraShim, } if host.iface == "" { host.iface = netstack.LookupDefaultIfaceName(nodeInfo.NetNSInfo.Interfaces) if host.iface == "" { return nil, fmt.Errorf("cannot lookup default host interface, please manually specify it via --calico-host-interface") } klog.V(5).Infof("detected host interface %s on node %s", host.iface, nodeInfo.NodeName) } iface, ok := lo.Find(nodeInfo.Interfaces, func(i netstack.Interface) bool { return i.Name == host.iface }) if !ok { return nil, fmt.Errorf("cannot find interface %s", options.Interface) } ip, mask := netstack.GetDefaultIPv4(&iface) ipNet := &net.IPNet{IP: ip, Mask: mask} host.network = ipNet err := host.initRoute() if err != nil { return nil, err } err = host.basicCheck() if err != nil { return nil, err } return host, nil } func (h *calicoHost) initRoute() error { routes := map[string]assertions.RouteAssertion{ h.network.String(): {Dev: &h.iface, Scope: utils.ToPointer(netstack.ScopeLink)}, } if h.gateway != nil && !h.gateway.IsUnspecified() { routes["0.0.0.0/0"] = assertions.RouteAssertion{ Dev: &h.iface, Scope: utils.ToPointer(netstack.ScopeUniverse), Gw: &h.gateway, } } h.route = newCalicoRoute(routes, h.ipCache, h.iface, h.nodeInfo.NodeName, h.ipPools, h.net, h.network) return nil } func (h *calicoHost) basicCheck() error { h.net.AssertDefaultRule() h.net.AssertNoPolicyRoute() h.net.AssertNetDevice(h.iface, netstack.Interface{MTU: h.mtu, State: netstack.LinkUP}) h.net.AssertSysctls(map[string]string{ "net.ipv4.ip_forward": "1", fmt.Sprintf("net.ipv4.conf.%s.forwarding", utils.ConvertNICNameInSysctls(h.iface)): "1", }, model.SuspicionLevelFatal) return nil } func (h *calicoHost) transmissionToPod(pkt *model.Packet, pod *v1.Pod, iif string) (model.Transmission, error) { if !pod.Spec.HostNetwork && pod.Spec.NodeName == h.nodeInfo.NodeName { // send to local pod err := h.checkRoute(pkt) if err != nil { return model.Transmission{}, err } ifName := calicoVethName(pod.Namespace, pod.Name) h.assertInterface(ifName) pktOut := pkt.DeepCopy() link := &model.Link{ Type: model.LinkVeth, Source: h.netNode, Packet: pktOut, SourceAttribute: model.SimpleLinkAttribute{Interface: ifName}, } nextHop := model.Hop{ Type: model.NetNodeTypePod, ID: pkt.Dst.String(), } return model.Transmission{ NextHop: nextHop, Link: link, }, nil } node, err := h.ipCache.GetNodeFromIP(pod.Status.HostIP) if err != nil { return model.Transmission{}, err } return h.transmissionToNode(pkt, node, iif) } func (h *calicoHost) transmissionToNode(pkt *model.Packet, node *v1.Node, _ string) (model.Transmission, error) { err := h.checkRoute(pkt) if err != nil { return model.Transmission{}, err } pktOut := pkt.DeepCopy() err = h.doMasquerade(pktOut) if err != nil { return model.Transmission{}, nil } if h.infraShim != nil { dstNode, err := h.ipCache.GetNodeFromName(h.nodeInfo.NodeName) if err != nil { return model.Transmission{}, err } suspicions, err := h.infraShim.NodeToNode(dstNode, h.iface, node, pktOut) if err != nil { return model.Transmission{}, err } h.netNode.Suspicions = append(h.netNode.Suspicions, suspicions...) } ipPool := getIPPool(h.ipPools, pkt.Dst) networkMode := h.route.getNetworkModeFromIPPool(ipPool, pkt.Dst) oif := h.iface if networkMode == CalicoNetworkModeIPIP { oif = CalicoTunnelInterface h.assertInterface(oif) } // encap packet pktOut, err = h.Encap(pktOut, node, networkMode) if err != nil { return model.Transmission{}, err } link := &model.Link{ Type: model.LinkInfra, Source: h.netNode, Packet: pktOut, SourceAttribute: model.SimpleLinkAttribute{Interface: oif}, } nextHop := model.Hop{ Type: model.NetNodeTypeNode, ID: node.Name, } return model.Transmission{ NextHop: nextHop, Link: link, }, nil } func (h *calicoHost) transmissionToExternal(pkt *model.Packet, _ string) (model.Transmission, error) { err := h.checkRoute(pkt) if err != nil { return model.Transmission{}, err } pktOut := pkt.DeepCopy() err = h.masquerade(pktOut) if err != nil { return model.Transmission{}, err } link := &model.Link{ Type: model.LinkInfra, Source: h.netNode, Packet: pktOut, SourceAttribute: model.SimpleLinkAttribute{Interface: h.iface}, } nextHop := model.Hop{ Type: model.NetNodeTypeExternal, ID: pktOut.Dst.String(), } return model.Transmission{ NextHop: nextHop, Link: link, }, nil } func (h *calicoHost) to(upstream *model.Link, dst model.Endpoint, protocol model.Protocol, transmit transmissionFunc) ([]model.Transmission, error) { iif, err := h.detectIif(upstream) if err != nil { return nil, err } var action *model.Action var transmission model.Transmission var pkt *model.Packet var nfAssertionFunc func(pktIn model.Packet, pktOut []model.Packet, iif string) if upstream != nil { if upstream.Type == model.LinkVeth { h.assertInterface(iif) } upstream.Destination = h.netNode upstream.DestinationAttribute = model.SimpleLinkAttribute{Interface: iif} pkt = h.Decap(upstream.Packet) transmission, err = transmit(pkt, iif) if err != nil { return nil, err } if transmission.Link == nil { return nil, nil } nfAssertionFunc = h.net.AssertNetfilterForward action = model.ActionForward(upstream, []*model.Link{transmission.Link}) } else { pkt = &model.Packet{ Dst: net.ParseIP(dst.IP), Dport: dst.Port, Protocol: protocol, } addr, _, err := h.nodeInfo.Router.RouteSrc(pkt, "", "") if err != nil { if err == netstack.ErrNoRouteToHost { h.netNode.AddSuspicion(model.SuspicionLevelFatal, fmt.Sprintf("no route to host: %v", dst)) return nil, &assertions.CannotBuildTransmissionError{ SrcNode: h.netNode, Err: fmt.Errorf("no route to host: %v", err), } } return nil, err } pkt.Src = net.ParseIP(addr) transmission, err = transmit(pkt, iif) if err != nil { return nil, err } if transmission.Link == nil { return nil, nil } nfAssertionFunc = h.net.AssertNetfilterSend action = model.ActionSend([]*model.Link{transmission.Link}) } pktOut := transmission.Link.Packet nfAssertionFunc(*pkt, []model.Packet{*pktOut}, iif) h.netNode.DoAction(action) return []model.Transmission{transmission}, nil } func (h *calicoHost) ToPod(upstream *model.Link, dst model.Endpoint, protocol model.Protocol, pod *v1.Pod) ([]model.Transmission, error) { makeLink := func(pkt *model.Packet, iif string) (model.Transmission, error) { return h.transmissionToPod(pkt, pod, iif) } return h.to(upstream, dst, protocol, makeLink) } func (h *calicoHost) ToHost(upstream *model.Link, dst model.Endpoint, protocol model.Protocol, node *v1.Node) ([]model.Transmission, error) { makeLink := func(pkt *model.Packet, iif string) (model.Transmission, error) { return h.transmissionToNode(pkt, node, iif) } return h.to(upstream, dst, protocol, makeLink) } func (h *calicoHost) ToService(upstream *model.Link, dst model.Endpoint, protocol model.Protocol, service *v1.Service) ([]model.Transmission, error) { iif, err := h.detectIif(upstream) if err != nil { return nil, err } var pkt *model.Packet if upstream != nil { upstream.DestinationAttribute = model.SimpleLinkAttribute{Interface: iif} pkt = upstream.Packet } else { pkt = &model.Packet{ Dst: net.ParseIP(dst.IP), Dport: dst.Port, Protocol: protocol, } src, _, err := h.nodeInfo.Router.RouteSrc(pkt, "", "") if err != nil { return nil, err } pkt.Src = net.ParseIP(src) } node, err := h.ipCache.GetNodeFromName(h.nodeInfo.NodeName) if err != nil { return nil, err } if len(service.Spec.LoadBalancerSourceRanges) > 0 { cidrAllow := false for _, sourceRange := range service.Spec.LoadBalancerSourceRanges { _, cidr, err := net.ParseCIDR(sourceRange) if err != nil { klog.Errorf("source range(%s) format invalid, %v", cidr, err) continue } if cidr.Contains(pkt.Src) { cidrAllow = true } } if !cidrAllow { h.netNode.Suspicions = append(h.netNode.Suspicions, model.Suspicion{ Level: model.SuspicionLevelWarning, Message: fmt.Sprintf("service sourcePortRange(%v) not allow source ip(%v) access", service.Spec.LoadBalancerSourceRanges, pkt.Src), }) } } backends := h.serviceProcessor.Process(*pkt, service, node) if len(backends) == 0 { h.netNode.Suspicions = append(h.netNode.Suspicions, model.Suspicion{ Level: model.SuspicionLevelFatal, Message: fmt.Sprintf("service %s/%s has no valid endpoint", service.Namespace, service.Name), }) h.netNode.DoAction(model.ActionService(upstream, []*model.Link{})) return nil, &assertions.CannotBuildTransmissionError{ SrcNode: h.netNode, Err: fmt.Errorf("service %s/%s has no valid endpoint", service.Namespace, service.Name), } } if err := h.serviceProcessor.Validate(*pkt, backends, h.nodeInfo.NetNS); err != nil { h.netNode.Suspicions = append(h.netNode.Suspicions, model.Suspicion{ Level: model.SuspicionLevelFatal, Message: fmt.Sprintf("validate endpoint of service %s/%s failed: %s", service.Namespace, service.Name, err), }) } var nfAssertion func(pktIn model.Packet, pktOut []model.Packet, iif string) if upstream != nil { nfAssertion = h.net.AssertNetfilterForward } else { nfAssertion = h.net.AssertNetfilterSend } var transmissions []model.Transmission for _, backend := range backends { pktOut := &model.Packet{ Src: pkt.Src, Sport: pkt.Sport, Dst: net.ParseIP(backend.IP), Dport: backend.Port, Protocol: protocol, } if backend.Masquerade { ip, _, err := h.nodeInfo.Router.RouteSrc(pktOut, "", "") if err != nil { return nil, err } pktOut.Src = net.ParseIP(ip) } backendType, err := h.ipCache.GetIPType(backend.IP) if err != nil { return nil, err } switch backendType { case model.EndpointTypePod: pod, err := h.ipCache.GetPodFromIP(backend.IP) if err != nil { return nil, err } if pod == nil { return nil, fmt.Errorf("cannot find pod from ip %s", backend.IP) } transmission, err := h.transmissionToPod(pktOut, pod, iif) if err != nil { return nil, err } transmissions = append(transmissions, transmission) case model.EndpointTypeNode: node, err := h.ipCache.GetNodeFromIP(backend.IP) if err != nil { return nil, err } if node == nil { return nil, fmt.Errorf("cannot find node from ip %s", backend.IP) } transmission, err := h.transmissionToNode(pktOut, node, iif) if err != nil { return nil, err } transmissions = append(transmissions, transmission) default: transmission, err := h.transmissionToExternal(pktOut, iif) if err != nil { return nil, err } transmissions = append(transmissions, transmission) } } links := lo.Map(transmissions, func(t model.Transmission, _ int) *model.Link { return t.Link }) h.netNode.DoAction(model.ActionService(upstream, links)) pktOutList := lo.Map(links, func(l *model.Link, _ int) model.Packet { return *l.Packet }) nfAssertion(*pkt, pktOutList, iif) return transmissions, nil } func (h *calicoHost) ToExternal(upstream *model.Link, dst model.Endpoint, protocol model.Protocol) ([]model.Transmission, error) { makeLink := func(pkt *model.Packet, iif string) (model.Transmission, error) { return h.transmissionToExternal(pkt, iif) } return h.to(upstream, dst, protocol, makeLink) } func (h *calicoHost) Serve(upstream *model.Link, dst model.Endpoint, protocol model.Protocol) ([]model.Transmission, error) { iif, err := h.detectIif(upstream) if err != nil { return nil, err } if upstream.Packet.Encap != nil { h.net.AssertNetfilterServe(*upstream.Packet, iif) innerPacket := h.Decap(upstream.Packet) if err != nil { return nil, err } pod, err := h.ipCache.GetPodFromIP(innerPacket.Dst.String()) if err != nil { return nil, err } if pod == nil { return nil, fmt.Errorf("inner packet pod %s not found", innerPacket.Dst) } return h.ToPod(upstream, model.Endpoint{ IP: innerPacket.Dst.String(), Type: model.EndpointTypePod, Port: innerPacket.Dport, }, innerPacket.Protocol, pod) } err = h.route.Assert(ack(upstream.Packet)) if err != nil { return nil, err } upstream.DestinationAttribute = model.SimpleLinkAttribute{Interface: iif} h.net.AssertNetfilterServe(*upstream.Packet, iif) h.net.AssertListen(net.ParseIP(dst.IP), dst.Port, protocol) h.netNode.DoAction(model.ActionServe(upstream)) return nil, nil } func (h *calicoHost) checkRoute(pkt *model.Packet) error { err := h.route.Assert(pkt) if err != nil { return err } err = h.route.Assert(ack(pkt)) if err != nil { return err } return nil } func (h *calicoHost) assertInterface(ifName string) { if ifName == CalicoTunnelInterface { h.net.AssertDefaultIPIPTunnel(ifName) } h.net.AssertNetDevice(ifName, netstack.Interface{State: netstack.LinkUP}) h.net.AssertSysctls(map[string]string{ fmt.Sprintf("net.ipv4.conf.%s.forwarding", utils.ConvertNICNameInSysctls(ifName)): "1", }, model.SuspicionLevelWarning) } func (h *calicoHost) Encap(pkt *model.Packet, nextNode *v1.Node, mode CalicoNetworkMode) (*model.Packet, error) { if mode == CalicoNetworkModeIPIP { nextNodeIP := nextNode.Annotations["projectcalico.org/IPv4Address"] if nextNodeIP == "" { return nil, fmt.Errorf("node %q does not have projectcalico.org/IPv4Address annotation", nextNode.Name) } ip, _, err := net.ParseCIDR(nextNodeIP) if err != nil { return nil, err } pktOut := &model.Packet{ Src: h.network.IP, Dst: ip, Dport: pkt.Dport, Protocol: model.IPv4, Encap: pkt, } return pktOut, nil } return pkt, nil } func (h *calicoHost) Decap(pkt *model.Packet) *model.Packet { if pkt.Encap != nil { return pkt.Encap } return pkt } func (h *calicoHost) masquerade(pkt *model.Packet) error { ippoolSrc := getIPPool(h.ipPools, pkt.Src) if ippoolSrc == nil { return nil } if !ippoolSrc.Spec.NATOutgoing { return nil } ipPoolDst := getIPPool(h.ipPools, pkt.Dst) if ipPoolDst != nil { return nil } node, err := h.ipCache.GetNodeFromIP(pkt.Dst.String()) if err != nil { return err } if node != nil { return nil } return h.doMasquerade(pkt) } func (h *calicoHost) doMasquerade(pkt *model.Packet) error { srcPool := getIPPool(h.ipPools, pkt.Src) if srcPool == nil { return nil } if !srcPool.Spec.NATOutgoing { return nil } dstPool := getIPPool(h.ipPools, pkt.Dst) if dstPool != nil { return nil } ip, _, err := h.nodeInfo.Router.RouteSrc(pkt, "", "") if err != nil { return err } pkt.Src = net.ParseIP(ip) return nil } func (h *calicoHost) detectIif(upstream *model.Link) (string, error) { if upstream == nil { return "", nil } if upstream.Type == model.LinkVeth { // fixme: should not depend on the format of pod id names := strings.Split(upstream.Source.GetID(), "/") pod, err := h.ipCache.GetPodFromName(names[0], names[1]) if err != nil { return "", err } return calicoVethName(pod.Namespace, pod.Name), nil } if upstream.Packet.Encap != nil && upstream.Packet.Protocol == model.IPv4 { return CalicoTunnelInterface, nil } return h.iface, nil }