pkg/skoop/network/aliyun/node.go (332 lines of code) (raw):
package aliyun
import (
"errors"
"fmt"
"net"
"strings"
"github.com/alibaba/kubeskoop/pkg/skoop/assertions"
ctx "github.com/alibaba/kubeskoop/pkg/skoop/context"
"github.com/alibaba/kubeskoop/pkg/skoop/k8s"
"github.com/alibaba/kubeskoop/pkg/skoop/model"
"github.com/alibaba/kubeskoop/pkg/skoop/network"
"github.com/alibaba/kubeskoop/pkg/skoop/plugin"
"github.com/alibaba/kubeskoop/pkg/skoop/service"
"github.com/samber/lo"
v1 "k8s.io/api/core/v1"
)
const (
NetNodeTypeSLB = "slb"
)
type netNodeManager struct {
infraShim network.InfraShim
ipCache *k8s.IPCache
processor service.Processor
pluginName string
}
func (n *netNodeManager) GetNetNodeFromID(nodeType model.NetNodeType, id string) (model.NetNodeAction, error) {
switch nodeType {
case model.NetNodeTypePod, model.NetNodeTypeNode:
return nil, fmt.Errorf("unspoorted type %s for aliyun net node manager", nodeType)
case NetNodeTypeSLB:
return newSLBNode(n.infraShim, n.ipCache, n.processor, n.pluginName, net.ParseIP(id), id)
default:
return newExternalNode(n.infraShim, n.ipCache, n.processor, n.pluginName, net.ParseIP(id), id)
}
}
type externalNode struct {
ip net.IP
netNode *model.NetNode
genericNode *plugin.GenericNetNode
infraShim network.InfraShim
ipCache *k8s.IPCache
processor service.Processor
plugin string
}
type slbNode struct {
ip net.IP
netNode *model.NetNode
genericNode *plugin.GenericNetNode
infraShim network.InfraShim
ipCache *k8s.IPCache
processor service.Processor
plugin string
}
func (n *slbNode) Send(_ model.Endpoint, _ model.Protocol) ([]model.Transmission, error) {
return nil, errors.New("can not send a packet from a loadbalancer node")
}
func (n *slbNode) Receive(upstream *model.Link) ([]model.Transmission, error) {
upstream.Destination = n.netNode
pkt := upstream.Packet
svc, err := n.ipCache.GetServiceFromIP(pkt.Dst.String())
if err != nil {
return nil, err
}
backends := n.processor.Process(*pkt, svc, nil)
if len(backends) == 0 {
m := fmt.Sprintf("service %s/%s has no valid endpoint", svc.Namespace, svc.Name)
n.netNode.AddSuspicion(model.SuspicionLevelFatal, m)
n.netNode.DoAction(model.ActionService(nil, []*model.Link{}))
return nil, &assertions.CannotBuildTransmissionError{
SrcNode: n.netNode,
Err: errors.New(m),
}
}
if len(backends) > 10 {
m := fmt.Sprintf("too many backends: %d, stop.", len(backends))
n.netNode.AddSuspicion(model.SuspicionLevelInfo, m)
n.netNode.DoAction(model.ActionService(nil, []*model.Link{}))
return nil, &assertions.CannotBuildTransmissionError{
SrcNode: n.netNode,
Err: errors.New(m),
}
}
if !lo.ContainsBy(svc.Spec.Ports, func(p v1.ServicePort) bool {
return p.Port == int32(pkt.Dport) && strings.EqualFold(string(p.Protocol), string(pkt.Protocol))
}) {
m := fmt.Sprintf("cannot find port %d protocol %s on service %s/%s",
pkt.Dport, pkt.Protocol, svc.Namespace, svc.Name)
n.netNode.AddSuspicion(model.SuspicionLevelFatal, m)
return nil, &assertions.CannotBuildTransmissionError{
SrcNode: n.netNode,
Err: errors.New(m),
}
}
nodePort := service.GetNodePort(svc, pkt.Dport, pkt.Protocol)
targetPort := service.GetTargetPort(svc, pkt.Dport, pkt.Protocol)
var transmission []model.Transmission
var lbBackends []network.LoadBalancerBackend
nodeMap := map[string]struct{}{}
if svc.Spec.ExternalTrafficPolicy == v1.ServiceExternalTrafficPolicyTypeLocal || n.plugin == ctx.NetworkPluginTerway {
// terway plugin will add pod ip directly to the loadbalancer backend
for _, b := range backends {
t, err := n.ipCache.GetIPType(b.IP)
if err != nil {
return nil, err
}
nextHop := model.Hop{
Type: model.NetNodeTypeNode,
}
pkt := pkt.DeepCopy()
pkt.Dst = net.ParseIP(b.IP)
pkt.Dport = targetPort
var node *v1.Node
switch t {
case model.EndpointTypePod:
pod, err := n.ipCache.GetPodFromIP(b.IP)
if err != nil {
return nil, err
}
node, err = n.ipCache.GetNodeFromName(pod.Spec.NodeName)
if err != nil {
return nil, err
}
nextHop.ID = node.Name
if n.plugin != ctx.NetworkPluginTerway {
if _, ok := nodeMap[node.Name]; ok {
continue
}
pkt.Dst = net.ParseIP(getNodeInternalIP(node))
pkt.Dport = nodePort
nodeMap[node.Name] = struct{}{}
}
case model.EndpointTypeNode:
node, err = n.ipCache.GetNodeFromIP(b.IP)
if err != nil {
return nil, err
}
nextHop.ID = node.Name
}
trans := model.Transmission{
NextHop: nextHop,
Link: &model.Link{
Type: model.LinkInfra,
Source: n.netNode,
Packet: pkt,
},
}
transmission = append(transmission, trans)
lbBackend := network.LoadBalancerBackend{
IP: pkt.Dst.String(),
Port: pkt.Dport,
}
if n.plugin != ctx.NetworkPluginTerway && node != nil {
provider := strings.Split(node.Spec.ProviderID, ".")
if len(provider) == 2 {
lbBackend.ID = provider[1]
}
}
// todo: support terway plugin eni
lbBackends = append(lbBackends, lbBackend)
}
} else {
var err error
transmission, err = n.getTransmissionsToNodePort(nodePort, pkt)
if err != nil {
return nil, err
}
}
sus, err := n.infraShim.ExternalToLoadBalancer(svc, pkt, lbBackends)
if err != nil {
return nil, err
}
n.netNode.Suspicions = append(n.netNode.Suspicions, sus...)
links := lo.Map(transmission, func(t model.Transmission, _ int) *model.Link {
return t.Link
})
n.netNode.DoAction(model.ActionService(upstream, links))
return transmission, nil
}
func (n *slbNode) getTransmissionsToNodePort(nodePort uint16, pkt *model.Packet) ([]model.Transmission, error) {
nodes, err := n.ipCache.GetNodes()
if err != nil {
return nil, err
}
// todo: exclude nodes with specific labels
return lo.Map(nodes, func(node *v1.Node, _ int) model.Transmission {
hop := model.Hop{
Type: model.NetNodeTypeNode,
ID: node.Name,
}
ip := getNodeInternalIP(node)
pkt := pkt.DeepCopy()
pkt.Dst = net.ParseIP(ip)
pkt.Dport = nodePort
return model.Transmission{
NextHop: hop,
Link: &model.Link{
Type: model.LinkInfra,
Source: n.netNode,
Packet: pkt,
},
}
}), nil
}
func newExternalNode(infraShim network.InfraShim, ipCache *k8s.IPCache, processor service.Processor, pluginName string, ip net.IP, id string) (model.NetNodeAction, error) {
netNode := &model.NetNode{
Type: model.NetNodeTypeExternal,
ID: id,
Actions: map[*model.Link]*model.Action{},
}
return &externalNode{
ip: ip,
ipCache: ipCache,
processor: processor,
netNode: netNode,
genericNode: &plugin.GenericNetNode{NetNode: netNode},
infraShim: infraShim,
plugin: pluginName,
}, nil
}
func newSLBNode(infraShim network.InfraShim, ipCache *k8s.IPCache, processor service.Processor, pluginName string, ip net.IP, id string) (model.NetNodeAction, error) {
netNode := &model.NetNode{
Type: NetNodeTypeSLB,
ID: id,
Actions: map[*model.Link]*model.Action{},
}
return &slbNode{
ip: ip,
ipCache: ipCache,
processor: processor,
netNode: netNode,
genericNode: &plugin.GenericNetNode{NetNode: netNode},
infraShim: infraShim,
plugin: pluginName,
}, nil
}
func (n *externalNode) Send(dst model.Endpoint, protocol model.Protocol) ([]model.Transmission, error) {
t, err := n.ipCache.GetIPType(dst.IP)
if err != nil {
return nil, err
}
switch t {
case model.EndpointTypePod:
pod, err := n.ipCache.GetPodFromIP(dst.IP)
if err != nil {
return nil, err
}
node, err := n.ipCache.GetNodeFromName(pod.Spec.NodeName)
if err != nil {
return nil, err
}
return n.sendToNode(dst, protocol, node)
case model.EndpointTypeNode:
node, err := n.ipCache.GetNodeFromIP(dst.IP)
if err != nil {
return nil, err
}
return n.sendToNode(dst, protocol, node)
case model.EndpointTypeLoadbalancer:
return n.sendToLoadBalancer(dst, protocol)
case model.EndpointTypeExternal, model.EndpointTypeService:
msg := fmt.Sprintf("cannot send packet from external ip to %s ip %s", t, dst.IP)
n.netNode.AddSuspicion(model.SuspicionLevelFatal, msg)
return nil, &assertions.CannotBuildTransmissionError{
SrcNode: n.netNode,
Err: errors.New(msg),
}
default:
msg := fmt.Sprintf("not supported endpoint type %s", t)
n.netNode.AddSuspicion(model.SuspicionLevelFatal, msg)
return nil, &assertions.CannotBuildTransmissionError{
SrcNode: n.netNode,
Err: errors.New(msg),
}
}
}
func (n *externalNode) sendToNode(dst model.Endpoint, protocol model.Protocol, node *v1.Node) ([]model.Transmission, error) {
pkt := &model.Packet{
Src: n.ip,
Dst: net.ParseIP(dst.IP),
Dport: dst.Port,
Protocol: protocol,
}
sus, err := n.infraShim.ExternalToNode(node, pkt)
if err != nil {
return nil, err
}
n.netNode.Suspicions = append(n.netNode.Suspicions, sus...)
trans := model.Transmission{
NextHop: model.Hop{
Type: model.NetNodeTypeNode,
ID: node.Name,
},
Link: &model.Link{
Type: model.LinkInfra,
Source: n.netNode,
Packet: pkt,
},
}
action := model.ActionSend([]*model.Link{trans.Link})
n.netNode.DoAction(action)
return []model.Transmission{trans}, nil
}
func (n *externalNode) sendToLoadBalancer(dst model.Endpoint, protocol model.Protocol) ([]model.Transmission, error) {
pkt := &model.Packet{
Src: n.ip,
Dst: net.ParseIP(dst.IP),
Dport: dst.Port,
Protocol: protocol,
}
hop := model.Hop{
Type: NetNodeTypeSLB,
ID: dst.IP,
}
trans := model.Transmission{
NextHop: hop,
Link: &model.Link{
Type: model.LinkExternal,
Source: n.netNode,
Packet: pkt,
},
}
n.netNode.DoAction(model.ActionSend([]*model.Link{trans.Link}))
return []model.Transmission{trans}, nil
}
func (n *externalNode) Receive(upstream *model.Link) ([]model.Transmission, error) {
return n.genericNode.Receive(upstream)
}
func getNodeInternalIP(node *v1.Node) string {
for _, n := range node.Status.Addresses {
if n.Type == v1.NodeInternalIP {
return n.Address
}
}
return ""
}