in pkg/skoop/plugin/calico.go [631:769]
func (h *calicoHost) ToService(upstream *model.Link, dst model.Endpoint, protocol model.Protocol, service *v1.Service) ([]model.Transmission, error) {
iif, err := h.detectIif(upstream)
if err != nil {
return nil, err
}
var pkt *model.Packet
if upstream != nil {
upstream.DestinationAttribute = model.SimpleLinkAttribute{Interface: iif}
pkt = upstream.Packet
} else {
pkt = &model.Packet{
Dst: net.ParseIP(dst.IP),
Dport: dst.Port,
Protocol: protocol,
}
src, _, err := h.nodeInfo.Router.RouteSrc(pkt, "", "")
if err != nil {
return nil, err
}
pkt.Src = net.ParseIP(src)
}
node, err := h.ipCache.GetNodeFromName(h.nodeInfo.NodeName)
if err != nil {
return nil, err
}
if len(service.Spec.LoadBalancerSourceRanges) > 0 {
cidrAllow := false
for _, sourceRange := range service.Spec.LoadBalancerSourceRanges {
_, cidr, err := net.ParseCIDR(sourceRange)
if err != nil {
klog.Errorf("source range(%s) format invalid, %v", cidr, err)
continue
}
if cidr.Contains(pkt.Src) {
cidrAllow = true
}
}
if !cidrAllow {
h.netNode.Suspicions = append(h.netNode.Suspicions, model.Suspicion{
Level: model.SuspicionLevelWarning,
Message: fmt.Sprintf("service sourcePortRange(%v) not allow source ip(%v) access", service.Spec.LoadBalancerSourceRanges, pkt.Src),
})
}
}
backends := h.serviceProcessor.Process(*pkt, service, node)
if len(backends) == 0 {
h.netNode.Suspicions = append(h.netNode.Suspicions, model.Suspicion{
Level: model.SuspicionLevelFatal,
Message: fmt.Sprintf("service %s/%s has no valid endpoint", service.Namespace, service.Name),
})
h.netNode.DoAction(model.ActionService(upstream, []*model.Link{}))
return nil, &assertions.CannotBuildTransmissionError{
SrcNode: h.netNode,
Err: fmt.Errorf("service %s/%s has no valid endpoint", service.Namespace, service.Name),
}
}
if err := h.serviceProcessor.Validate(*pkt, backends, h.nodeInfo.NetNS); err != nil {
h.netNode.Suspicions = append(h.netNode.Suspicions, model.Suspicion{
Level: model.SuspicionLevelFatal,
Message: fmt.Sprintf("validate endpoint of service %s/%s failed: %s", service.Namespace, service.Name, err),
})
}
var nfAssertion func(pktIn model.Packet, pktOut []model.Packet, iif string)
if upstream != nil {
nfAssertion = h.net.AssertNetfilterForward
} else {
nfAssertion = h.net.AssertNetfilterSend
}
var transmissions []model.Transmission
for _, backend := range backends {
pktOut := &model.Packet{
Src: pkt.Src,
Sport: pkt.Sport,
Dst: net.ParseIP(backend.IP),
Dport: backend.Port,
Protocol: protocol,
}
if backend.Masquerade {
ip, _, err := h.nodeInfo.Router.RouteSrc(pktOut, "", "")
if err != nil {
return nil, err
}
pktOut.Src = net.ParseIP(ip)
}
backendType, err := h.ipCache.GetIPType(backend.IP)
if err != nil {
return nil, err
}
switch backendType {
case model.EndpointTypePod:
pod, err := h.ipCache.GetPodFromIP(backend.IP)
if err != nil {
return nil, err
}
if pod == nil {
return nil, fmt.Errorf("cannot find pod from ip %s", backend.IP)
}
transmission, err := h.transmissionToPod(pktOut, pod, iif)
if err != nil {
return nil, err
}
transmissions = append(transmissions, transmission)
case model.EndpointTypeNode:
node, err := h.ipCache.GetNodeFromIP(backend.IP)
if err != nil {
return nil, err
}
if node == nil {
return nil, fmt.Errorf("cannot find node from ip %s", backend.IP)
}
transmission, err := h.transmissionToNode(pktOut, node, iif)
if err != nil {
return nil, err
}
transmissions = append(transmissions, transmission)
default:
transmission, err := h.transmissionToExternal(pktOut, iif)
if err != nil {
return nil, err
}
transmissions = append(transmissions, transmission)
}
}
links := lo.Map(transmissions, func(t model.Transmission, _ int) *model.Link { return t.Link })
h.netNode.DoAction(model.ActionService(upstream, links))
pktOutList := lo.Map(links, func(l *model.Link, _ int) model.Packet { return *l.Packet })
nfAssertion(*pkt, pktOutList, iif)
return transmissions, nil
}