pkg/skoop/network/aliyun/network.go (171 lines of code) (raw):
package aliyun
import (
"context"
"fmt"
"strings"
"github.com/alibaba/kubeskoop/pkg/skoop/collector"
"github.com/alibaba/kubeskoop/pkg/skoop/collector/manager"
ctx "github.com/alibaba/kubeskoop/pkg/skoop/context"
"github.com/alibaba/kubeskoop/pkg/skoop/infra/aliyun"
"github.com/alibaba/kubeskoop/pkg/skoop/model"
"github.com/alibaba/kubeskoop/pkg/skoop/network"
"github.com/alibaba/kubeskoop/pkg/skoop/nodemanager"
"github.com/alibaba/kubeskoop/pkg/skoop/plugin"
"github.com/alibaba/kubeskoop/pkg/skoop/service"
"github.com/alibaba/kubeskoop/pkg/skoop/skoop"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2"
)
type flannelNetwork struct {
plugin plugin.Plugin
diagnostor skoop.Diagnostor
collectorManager collector.Manager
netNodeManager nodemanager.NetNodeManager
}
type calicoNetwork struct {
plugin plugin.Plugin
diagnostor skoop.Diagnostor
collectorManager collector.Manager
netNodeManager nodemanager.NetNodeManager
}
type terwayNetwork struct {
}
func getRegionAndInstanceID(ctx *ctx.Context) (string, string, error) {
nodes, err := ctx.KubernetesClient().CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
if err != nil {
return "", "", err
}
var region, instance string
for _, node := range nodes.Items {
if r, ok := node.Labels["topology.kubernetes.io/region"]; ok {
region = r
} else if r, ok := node.Labels["failure-domain.beta.kubernetes.io/region"]; ok {
region = r
}
providerID := strings.Split(node.Spec.ProviderID, ".")
if len(providerID) == 2 {
instance = providerID[1]
}
if region != "" && instance != "" {
break
}
}
klog.V(3).Infof("Found region %q, instance %q", region, instance)
if region == "" || instance == "" {
return "", "", fmt.Errorf("cannot find aliyun region or instance id in cluster")
}
return region, instance, nil
}
func buildCloudManager(_ *ctx.Context, region, instanceOfCluster string) (*aliyun.CloudManager, error) {
options := &aliyun.CloudManagerOptions{
Region: region,
AccessKeyID: aliyun.Config.AccessKeyID,
AccessKeySecret: aliyun.Config.AccessKeySecret,
SecurityToken: aliyun.Config.SecurityToken,
InstanceOfCluster: instanceOfCluster,
}
return aliyun.NewCloudManager(options)
}
func buildNetNodeManager(ctx *ctx.Context, plgn plugin.Plugin, infraShim network.InfraShim, serviceProcessor service.Processor, collectorManager collector.Manager) (nodemanager.NetNodeManager, error) {
aliyunNetNodeManager := &netNodeManager{
infraShim: infraShim,
ipCache: ctx.ClusterConfig().IPCache,
processor: serviceProcessor,
pluginName: ctx.ClusterConfig().NetworkPlugin,
}
return nodemanager.NewNetNodeManagerWithParent(ctx, aliyunNetNodeManager, plgn, collectorManager)
}
func NewFlannelNetwork(ctx *ctx.Context) (network.Network, error) {
region, instance, err := getRegionAndInstanceID(ctx)
if err != nil {
return nil, err
}
cloudManager, err := buildCloudManager(ctx, region, instance)
if err != nil {
return nil, err
}
infraShim, err := NewInfraShim(cloudManager)
if err != nil {
return nil, err
}
serviceProcessor := service.NewKubeProxyServiceProcessor(ctx)
plgn, err := plugin.NewFlannelPlugin(ctx, serviceProcessor, infraShim)
if err != nil {
return nil, err
}
collectorManager, err := manager.NewSimplePodCollectorManager(ctx)
if err != nil {
return nil, err
}
networkPolicy, err := plugin.NewNetworkPolicy(false, false, ctx.ClusterConfig().IPCache, ctx.KubernetesClient(), serviceProcessor)
if err != nil {
return nil, err
}
netNodeManager, err := buildNetNodeManager(ctx, plgn, infraShim, serviceProcessor, collectorManager)
if err != nil {
return nil, err
}
diagnostor, err := skoop.NewDefaultDiagnostor(ctx, netNodeManager, networkPolicy)
if err != nil {
return nil, err
}
return &flannelNetwork{
plugin: plgn,
diagnostor: diagnostor,
collectorManager: collectorManager,
netNodeManager: netNodeManager,
}, nil
}
func NewCalicoNetwork(ctx *ctx.Context) (network.Network, error) {
region, instance, err := getRegionAndInstanceID(ctx)
if err != nil {
return nil, err
}
cloudManager, err := buildCloudManager(ctx, region, instance)
if err != nil {
return nil, err
}
infraShim, err := NewInfraShim(cloudManager)
if err != nil {
return nil, err
}
serviceProcessor := service.NewKubeProxyServiceProcessor(ctx)
plgn, err := plugin.NewCalicoPlugin(ctx, serviceProcessor, infraShim)
if err != nil {
return nil, err
}
collectorManager, err := manager.NewSimplePodCollectorManager(ctx)
if err != nil {
return nil, err
}
netNodeManager, err := buildNetNodeManager(ctx, plgn, infraShim, serviceProcessor, collectorManager)
if err != nil {
return nil, err
}
networkPolicy, err := plugin.NewNetworkPolicy(true, false, ctx.ClusterConfig().IPCache, ctx.KubernetesClient(), serviceProcessor)
if err != nil {
return nil, err
}
diagnostor, err := skoop.NewDefaultDiagnostor(ctx, netNodeManager, networkPolicy)
if err != nil {
return nil, err
}
return &calicoNetwork{
plugin: plgn,
diagnostor: diagnostor,
collectorManager: collectorManager,
netNodeManager: netNodeManager,
}, nil
}
func NewTerwayNetwork() (network.Network, error) {
return &terwayNetwork{}, nil
}
func (n *flannelNetwork) Diagnose(ctx *ctx.Context, src model.Endpoint, dst model.Endpoint) ([]model.Suspicion, *model.PacketPath, error) {
return n.diagnostor.Diagnose(src, dst, model.Protocol(ctx.TaskConfig().Protocol))
}
func (n *calicoNetwork) Diagnose(ctx *ctx.Context, src model.Endpoint, dst model.Endpoint) ([]model.Suspicion, *model.PacketPath, error) {
return n.diagnostor.Diagnose(src, dst, model.Protocol(ctx.TaskConfig().Protocol))
}
func (n *terwayNetwork) Diagnose(_ *ctx.Context, _ model.Endpoint, _ model.Endpoint) ([]model.Suspicion, *model.PacketPath, error) {
// todo: implement me
panic("implement me!")
}