pkg/skoop/k8s/ip_cache.go (243 lines of code) (raw):
package k8s
import (
"context"
"fmt"
"strings"
"sync"
"github.com/alibaba/kubeskoop/pkg/skoop/utils"
"github.com/samber/lo"
"github.com/alibaba/kubeskoop/pkg/skoop/model"
"k8s.io/klog/v2"
v1 "k8s.io/api/core/v1"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
)
type IPCache struct {
k8sCli *kubernetes.Clientset
podCache map[string]*v1.Pod
nodeCache map[string]*v1.Node
serviceCache map[string]*v1.Service
cacheOnce *sync.Once
}
func NewIPCache(k8sCli *kubernetes.Clientset) *IPCache {
return &IPCache{
podCache: map[string]*v1.Pod{},
nodeCache: map[string]*v1.Node{},
serviceCache: map[string]*v1.Service{},
k8sCli: k8sCli,
cacheOnce: &sync.Once{},
}
}
func (c *IPCache) ClearCache() {
c.podCache = map[string]*v1.Pod{}
c.nodeCache = map[string]*v1.Node{}
c.serviceCache = map[string]*v1.Service{}
c.cacheOnce = &sync.Once{}
}
func (c *IPCache) GetPodFromIP(ip string) (*v1.Pod, error) {
err := c.BuildClusterIPCache()
if err != nil {
return nil, err
}
return c.podCache[ip], nil
}
func (c *IPCache) GetPodFromName(namespace, name string) (*v1.Pod, error) {
err := c.BuildClusterIPCache()
if err != nil {
return nil, err
}
for _, pod := range c.podCache {
if pod.Namespace == namespace && pod.Name == name {
return pod, nil
}
}
return nil, nil
}
func (c *IPCache) GetNodeFromIP(ip string) (*v1.Node, error) {
err := c.BuildClusterIPCache()
if err != nil {
return nil, err
}
return c.nodeCache[ip], nil
}
func (c *IPCache) GetNodeFromName(name string) (*v1.Node, error) {
err := c.BuildClusterIPCache()
if err != nil {
return nil, err
}
for _, node := range c.nodeCache {
if node.Name == name {
return node, nil
}
}
return nil, nil
}
func (c *IPCache) GetServiceFromIP(ip string) (*v1.Service, error) {
err := c.BuildClusterIPCache()
if err != nil {
return nil, err
}
return c.serviceCache[ip], err
}
func (c *IPCache) GetServiceFromNodePort(nodePort uint16, protocol model.Protocol) (*v1.Service, error) {
err := c.BuildClusterIPCache()
if err != nil {
return nil, err
}
for _, svc := range c.serviceCache {
for _, port := range svc.Spec.Ports {
if uint16(port.NodePort) == nodePort &&
strings.EqualFold(string(port.Protocol), string(protocol)) {
return svc, nil
}
}
}
return nil, nil
}
func (c *IPCache) GetIPType(ip string) (model.EndpointType, error) {
err := c.BuildClusterIPCache()
if err != nil {
return "", err
}
if _, ok := c.podCache[ip]; ok {
return model.EndpointTypePod, nil
}
if _, ok := c.nodeCache[ip]; ok {
return model.EndpointTypeNode, nil
}
if svc, ok := c.serviceCache[ip]; ok {
if svc.Spec.Type == v1.ServiceTypeLoadBalancer && utils.ContainsLoadBalancerIP(svc, ip) {
return model.EndpointTypeLoadbalancer, nil
}
return model.EndpointTypeService, nil
}
return model.EndpointTypeExternal, nil
}
func (c *IPCache) BuildClusterIPCache() error {
var innerErr error
c.cacheOnce.Do(
func() {
var err error
defer func() {
if err != nil {
innerErr = err
}
}()
err = c.buildPodCache()
if err != nil {
return
}
err = c.buildNodeCache()
if err != nil {
return
}
err = c.buildServiceCache()
if err != nil {
return
}
})
if innerErr != nil {
c.cacheOnce = &sync.Once{}
return fmt.Errorf("error build cluster ip cache: %v", innerErr)
}
return nil
}
func (c *IPCache) GetNodes() ([]*v1.Node, error) {
err := c.BuildClusterIPCache()
if err != nil {
return nil, err
}
return lo.Values(c.nodeCache), nil
}
func (c *IPCache) addPodIPCache(ipaddr string, pod *v1.Pod) error {
if priv, exist := c.podCache[ipaddr]; exist {
klog.Warningf("Pod %s/%s address %s conflict with %s/%s, ignoring.", pod.Namespace, pod.Name, ipaddr, priv.Namespace, priv.Name)
return nil
}
c.podCache[ipaddr] = pod
return nil
}
func (c *IPCache) addNodeIPCache(ipaddr string, node *v1.Node) error {
if priv, exist := c.nodeCache[ipaddr]; exist {
klog.Warningf("Node %s address %s conflict with %s, ignoring.", node.Name, ipaddr, priv.Name)
return nil
}
c.nodeCache[ipaddr] = node
return nil
}
func (c *IPCache) addServiceIPCache(ipaddr string, svc *v1.Service) error {
if priv, exist := c.serviceCache[ipaddr]; exist {
klog.Warningf("Service %s/%s address %s conflict with %s/%s, ignoring.", svc.Namespace, svc.Name, ipaddr, priv.Namespace, priv.Name)
return nil
}
c.serviceCache[ipaddr] = svc
return nil
}
func (c *IPCache) buildPodCache() error {
podList, err := c.k8sCli.CoreV1().Pods("").List(context.Background(), meta_v1.ListOptions{})
if err != nil {
return err
}
for _, pod := range podList.Items {
if pod.Spec.HostNetwork || pod.Status.Phase != v1.PodRunning {
klog.V(4).Infof("Pod %s/%s skipped, hostNetwork: %t, phase: %s, address: %v",
pod.Namespace, pod.Name, pod.Spec.HostNetwork, pod.Status.Phase, pod.Status.PodIP)
continue
}
copiedPod := pod.DeepCopy()
for _, ip := range pod.Status.PodIPs {
err = c.addPodIPCache(ip.IP, copiedPod)
if err != nil {
return err
}
}
}
return nil
}
func (c *IPCache) buildServiceCache() error {
svcList, err := c.k8sCli.CoreV1().Services("").List(context.Background(), meta_v1.ListOptions{})
if err != nil {
return err
}
for _, svc := range svcList.Items {
copiedSvc := svc.DeepCopy()
for _, clusterIP := range svc.Spec.ClusterIPs {
// skip headless service
if clusterIP == "None" {
continue
}
err = c.addServiceIPCache(clusterIP, copiedSvc)
if err != nil {
return err
}
}
for _, externalIP := range svc.Spec.ExternalIPs {
err = c.addServiceIPCache(externalIP, copiedSvc)
if err != nil {
return err
}
}
for _, lbIngress := range svc.Status.LoadBalancer.Ingress {
err = c.addServiceIPCache(lbIngress.IP, copiedSvc)
if err != nil {
return err
}
}
}
return nil
}
func (c *IPCache) buildNodeCache() error {
nodeList, err := c.k8sCli.CoreV1().Nodes().List(context.Background(), meta_v1.ListOptions{})
if err != nil {
return err
}
for _, node := range nodeList.Items {
copiedNode := node.DeepCopy()
for _, address := range node.Status.Addresses {
err = c.addNodeIPCache(address.Address, copiedNode)
if err != nil {
return err
}
}
}
return nil
}