func()

in pkg/skoop/network/aliyun/node.go [66:199]


func (n *slbNode) Receive(upstream *model.Link) ([]model.Transmission, error) {
	upstream.Destination = n.netNode
	pkt := upstream.Packet
	svc, err := n.ipCache.GetServiceFromIP(pkt.Dst.String())
	if err != nil {
		return nil, err
	}

	backends := n.processor.Process(*pkt, svc, nil)
	if len(backends) == 0 {
		m := fmt.Sprintf("service %s/%s has no valid endpoint", svc.Namespace, svc.Name)
		n.netNode.AddSuspicion(model.SuspicionLevelFatal, m)
		n.netNode.DoAction(model.ActionService(nil, []*model.Link{}))
		return nil, &assertions.CannotBuildTransmissionError{
			SrcNode: n.netNode,
			Err:     errors.New(m),
		}
	}
	if len(backends) > 10 {
		m := fmt.Sprintf("too many backends: %d, stop.", len(backends))
		n.netNode.AddSuspicion(model.SuspicionLevelInfo, m)
		n.netNode.DoAction(model.ActionService(nil, []*model.Link{}))
		return nil, &assertions.CannotBuildTransmissionError{
			SrcNode: n.netNode,
			Err:     errors.New(m),
		}
	}

	if !lo.ContainsBy(svc.Spec.Ports, func(p v1.ServicePort) bool {
		return p.Port == int32(pkt.Dport) && strings.EqualFold(string(p.Protocol), string(pkt.Protocol))
	}) {
		m := fmt.Sprintf("cannot find port %d protocol %s on service %s/%s",
			pkt.Dport, pkt.Protocol, svc.Namespace, svc.Name)
		n.netNode.AddSuspicion(model.SuspicionLevelFatal, m)
		return nil, &assertions.CannotBuildTransmissionError{
			SrcNode: n.netNode,
			Err:     errors.New(m),
		}
	}

	nodePort := service.GetNodePort(svc, pkt.Dport, pkt.Protocol)
	targetPort := service.GetTargetPort(svc, pkt.Dport, pkt.Protocol)

	var transmission []model.Transmission
	var lbBackends []network.LoadBalancerBackend
	nodeMap := map[string]struct{}{}
	if svc.Spec.ExternalTrafficPolicy == v1.ServiceExternalTrafficPolicyTypeLocal || n.plugin == ctx.NetworkPluginTerway {
		// terway plugin will add pod ip directly to the loadbalancer backend
		for _, b := range backends {
			t, err := n.ipCache.GetIPType(b.IP)
			if err != nil {
				return nil, err
			}

			nextHop := model.Hop{
				Type: model.NetNodeTypeNode,
			}
			pkt := pkt.DeepCopy()
			pkt.Dst = net.ParseIP(b.IP)
			pkt.Dport = targetPort

			var node *v1.Node
			switch t {
			case model.EndpointTypePod:
				pod, err := n.ipCache.GetPodFromIP(b.IP)
				if err != nil {
					return nil, err
				}
				node, err = n.ipCache.GetNodeFromName(pod.Spec.NodeName)
				if err != nil {
					return nil, err
				}
				nextHop.ID = node.Name
				if n.plugin != ctx.NetworkPluginTerway {
					if _, ok := nodeMap[node.Name]; ok {
						continue
					}
					pkt.Dst = net.ParseIP(getNodeInternalIP(node))
					pkt.Dport = nodePort
					nodeMap[node.Name] = struct{}{}
				}
			case model.EndpointTypeNode:
				node, err = n.ipCache.GetNodeFromIP(b.IP)
				if err != nil {
					return nil, err
				}
				nextHop.ID = node.Name
			}

			trans := model.Transmission{
				NextHop: nextHop,
				Link: &model.Link{
					Type:   model.LinkInfra,
					Source: n.netNode,
					Packet: pkt,
				},
			}
			transmission = append(transmission, trans)

			lbBackend := network.LoadBalancerBackend{
				IP:   pkt.Dst.String(),
				Port: pkt.Dport,
			}

			if n.plugin != ctx.NetworkPluginTerway && node != nil {
				provider := strings.Split(node.Spec.ProviderID, ".")
				if len(provider) == 2 {
					lbBackend.ID = provider[1]
				}
			}
			// todo: support terway plugin eni
			lbBackends = append(lbBackends, lbBackend)
		}
	} else {
		var err error
		transmission, err = n.getTransmissionsToNodePort(nodePort, pkt)
		if err != nil {
			return nil, err
		}
	}

	sus, err := n.infraShim.ExternalToLoadBalancer(svc, pkt, lbBackends)
	if err != nil {
		return nil, err
	}

	n.netNode.Suspicions = append(n.netNode.Suspicions, sus...)
	links := lo.Map(transmission, func(t model.Transmission, _ int) *model.Link {
		return t.Link
	})
	n.netNode.DoAction(model.ActionService(upstream, links))

	return transmission, nil
}