in pkg/skoop/network/aliyun/node.go [66:199]
func (n *slbNode) Receive(upstream *model.Link) ([]model.Transmission, error) {
upstream.Destination = n.netNode
pkt := upstream.Packet
svc, err := n.ipCache.GetServiceFromIP(pkt.Dst.String())
if err != nil {
return nil, err
}
backends := n.processor.Process(*pkt, svc, nil)
if len(backends) == 0 {
m := fmt.Sprintf("service %s/%s has no valid endpoint", svc.Namespace, svc.Name)
n.netNode.AddSuspicion(model.SuspicionLevelFatal, m)
n.netNode.DoAction(model.ActionService(nil, []*model.Link{}))
return nil, &assertions.CannotBuildTransmissionError{
SrcNode: n.netNode,
Err: errors.New(m),
}
}
if len(backends) > 10 {
m := fmt.Sprintf("too many backends: %d, stop.", len(backends))
n.netNode.AddSuspicion(model.SuspicionLevelInfo, m)
n.netNode.DoAction(model.ActionService(nil, []*model.Link{}))
return nil, &assertions.CannotBuildTransmissionError{
SrcNode: n.netNode,
Err: errors.New(m),
}
}
if !lo.ContainsBy(svc.Spec.Ports, func(p v1.ServicePort) bool {
return p.Port == int32(pkt.Dport) && strings.EqualFold(string(p.Protocol), string(pkt.Protocol))
}) {
m := fmt.Sprintf("cannot find port %d protocol %s on service %s/%s",
pkt.Dport, pkt.Protocol, svc.Namespace, svc.Name)
n.netNode.AddSuspicion(model.SuspicionLevelFatal, m)
return nil, &assertions.CannotBuildTransmissionError{
SrcNode: n.netNode,
Err: errors.New(m),
}
}
nodePort := service.GetNodePort(svc, pkt.Dport, pkt.Protocol)
targetPort := service.GetTargetPort(svc, pkt.Dport, pkt.Protocol)
var transmission []model.Transmission
var lbBackends []network.LoadBalancerBackend
nodeMap := map[string]struct{}{}
if svc.Spec.ExternalTrafficPolicy == v1.ServiceExternalTrafficPolicyTypeLocal || n.plugin == ctx.NetworkPluginTerway {
// terway plugin will add pod ip directly to the loadbalancer backend
for _, b := range backends {
t, err := n.ipCache.GetIPType(b.IP)
if err != nil {
return nil, err
}
nextHop := model.Hop{
Type: model.NetNodeTypeNode,
}
pkt := pkt.DeepCopy()
pkt.Dst = net.ParseIP(b.IP)
pkt.Dport = targetPort
var node *v1.Node
switch t {
case model.EndpointTypePod:
pod, err := n.ipCache.GetPodFromIP(b.IP)
if err != nil {
return nil, err
}
node, err = n.ipCache.GetNodeFromName(pod.Spec.NodeName)
if err != nil {
return nil, err
}
nextHop.ID = node.Name
if n.plugin != ctx.NetworkPluginTerway {
if _, ok := nodeMap[node.Name]; ok {
continue
}
pkt.Dst = net.ParseIP(getNodeInternalIP(node))
pkt.Dport = nodePort
nodeMap[node.Name] = struct{}{}
}
case model.EndpointTypeNode:
node, err = n.ipCache.GetNodeFromIP(b.IP)
if err != nil {
return nil, err
}
nextHop.ID = node.Name
}
trans := model.Transmission{
NextHop: nextHop,
Link: &model.Link{
Type: model.LinkInfra,
Source: n.netNode,
Packet: pkt,
},
}
transmission = append(transmission, trans)
lbBackend := network.LoadBalancerBackend{
IP: pkt.Dst.String(),
Port: pkt.Dport,
}
if n.plugin != ctx.NetworkPluginTerway && node != nil {
provider := strings.Split(node.Spec.ProviderID, ".")
if len(provider) == 2 {
lbBackend.ID = provider[1]
}
}
// todo: support terway plugin eni
lbBackends = append(lbBackends, lbBackend)
}
} else {
var err error
transmission, err = n.getTransmissionsToNodePort(nodePort, pkt)
if err != nil {
return nil, err
}
}
sus, err := n.infraShim.ExternalToLoadBalancer(svc, pkt, lbBackends)
if err != nil {
return nil, err
}
n.netNode.Suspicions = append(n.netNode.Suspicions, sus...)
links := lo.Map(transmission, func(t model.Transmission, _ int) *model.Link {
return t.Link
})
n.netNode.DoAction(model.ActionService(upstream, links))
return transmission, nil
}