pkg/skoop/nodemanager/net_node_manager.go (97 lines of code) (raw):

package nodemanager import ( "fmt" "sync" "github.com/alibaba/kubeskoop/pkg/skoop/collector" ctx "github.com/alibaba/kubeskoop/pkg/skoop/context" "github.com/alibaba/kubeskoop/pkg/skoop/k8s" "github.com/alibaba/kubeskoop/pkg/skoop/model" "github.com/alibaba/kubeskoop/pkg/skoop/plugin" "k8s.io/client-go/kubernetes" ) type NetNodeManager interface { GetNetNodeFromID(nodeType model.NetNodeType, id string) (model.NetNodeAction, error) } type defaultNetNodeManager struct { parent NetNodeManager client *kubernetes.Clientset ipCache *k8s.IPCache plugin plugin.Plugin collectorManager collector.Manager cache sync.Map } func NewNetNodeManager(ctx *ctx.Context, networkPlugin plugin.Plugin, collectorManager collector.Manager) (NetNodeManager, error) { return &defaultNetNodeManager{ client: ctx.KubernetesClient(), ipCache: ctx.ClusterConfig().IPCache, plugin: networkPlugin, collectorManager: collectorManager, }, nil } func NewNetNodeManagerWithParent(ctx *ctx.Context, parent NetNodeManager, networkPlugin plugin.Plugin, collectorManager collector.Manager) (NetNodeManager, error) { return &defaultNetNodeManager{ parent: parent, client: ctx.KubernetesClient(), ipCache: ctx.ClusterConfig().IPCache, plugin: networkPlugin, collectorManager: collectorManager, }, nil } func (m *defaultNetNodeManager) GetNetNodeFromID(nodeType model.NetNodeType, id string) (model.NetNodeAction, error) { key := m.cacheKey(nodeType, id) if node, ok := m.cache.Load(key); ok { return node.(model.NetNodeAction), nil } var ret model.NetNodeAction switch nodeType { case model.NetNodeTypePod: k8sPod, err := m.ipCache.GetPodFromIP(id) if err != nil { return nil, err } if k8sPod == nil { return nil, fmt.Errorf("k8s pod not found from ip %s", id) } podInfo, err := m.collectorManager.CollectPod(k8sPod.Namespace, k8sPod.Name) if err != nil { return nil, fmt.Errorf("error run collector for pod: %v", err) } ret, err = m.plugin.CreatePod(podInfo) if err != nil { return nil, fmt.Errorf("error create pod: %v", err) } case model.NetNodeTypeNode: nodeInfo, err := m.collectorManager.CollectNode(id) if err != nil { return nil, fmt.Errorf("error run collector for node: %v", err) } ret, err = m.plugin.CreateNode(nodeInfo) if err != nil { return nil, fmt.Errorf("error create node: %v", err) } default: if m.parent != nil { var err error ret, err = m.parent.GetNetNodeFromID(nodeType, id) if err != nil { return nil, err } } else { ret = &plugin.GenericNetNode{ NetNode: &model.NetNode{ Type: model.NetNodeTypeGeneric, ID: id, Actions: map[*model.Link]*model.Action{}, }, } } } if ret != nil { m.cache.Store(key, ret) } return ret, nil } func (m *defaultNetNodeManager) cacheKey(typ model.NetNodeType, id string) string { return fmt.Sprintf("%s---%s", typ, id) } var _ NetNodeManager = &defaultNetNodeManager{}