pkg/skoop/utils/k8s.go (144 lines of code) (raw):

package utils import ( "context" "fmt" "github.com/samber/lo" "golang.org/x/exp/slices" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/yaml" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" ) // NewConfig returns a new Kubernetes configuration object func NewConfig(kubeconfigPath string) (*rest.Config, *clientcmd.ClientConfig, error) { var err error var cc *rest.Config if kubeconfigPath == "" { return nil, nil, fmt.Errorf("kubeconfig path is invalid") } kubeconfig := clientcmd.NewNonInteractiveDeferredLoadingClientConfig( &clientcmd.ClientConfigLoadingRules{ExplicitPath: kubeconfigPath}, &clientcmd.ConfigOverrides{}) cc, err = kubeconfig.ClientConfig() if err == nil { return cc, &kubeconfig, nil } kubeconfig = clientcmd.NewNonInteractiveDeferredLoadingClientConfig( clientcmd.NewDefaultClientConfigLoadingRules(), &clientcmd.ConfigOverrides{}) cc, err = kubeconfig.ClientConfig() if err == nil { return cc, &kubeconfig, nil } return nil, nil, fmt.Errorf("Failed to load Kubernetes config: %s", err) } func Normalize(objType string, obj interface{}) string { type normalize interface { GetNamespace() string GetName() string } objMeta, ok := obj.(normalize) if ok { return fmt.Sprintf("%s/%s/%s", objType, objMeta.GetNamespace(), objMeta.GetName()) } return "" } func DetectNetworkPlugin(k8sCli *kubernetes.Clientset) (networkMode string, err error) { dss, err := k8sCli.AppsV1().DaemonSets("").List(context.Background(), metav1.ListOptions{}) if err != nil { return "", err } for _, ds := range dss.Items { switch ds.Name { case "kube-flannel-ds": return "flannel", nil case "calico-node": return "calico", nil case "terway-eniip": return "terway-eniip", nil } } return "", nil } var kubeProxyConfigmaps = []string{"kube-proxy", "kube-proxy-worker"} func getKubeProxyConfigFromConfigMap(k8sCli *kubernetes.Clientset) (string, error) { var cm *v1.ConfigMap var err error for _, cmName := range kubeProxyConfigmaps { cm, err = k8sCli.CoreV1().ConfigMaps("kube-system").Get(context.Background(), cmName, metav1.GetOptions{}) if err != nil && !errors.IsNotFound(err) { return "", err } if err == nil { break } } if cm == nil { return "", fmt.Errorf("cannot find kube-proxy configmap") } return cm.Data["config.conf"], nil } func DetectKubeProxyMode(k8sCli *kubernetes.Clientset) (string, error) { conf, err := getKubeProxyConfigFromConfigMap(k8sCli) if err != nil { return "", err } if conf == "" { return "iptables", nil } cfg := struct { Mode string `yaml:"mode"` }{} err = yaml.Unmarshal([]byte(conf), &cfg) if err != nil { return "", err } if cfg.Mode == "" { return "iptables", nil } return cfg.Mode, nil } func DetectClusterCIDR(k8sCli *kubernetes.Clientset) (string, error) { conf, err := getKubeProxyConfigFromConfigMap(k8sCli) if err != nil { return "", err } cfg := struct { ClusterCIDR string `yaml:"clusterCIDR"` }{} err = yaml.Unmarshal([]byte(conf), &cfg) if err != nil { return "", err } return cfg.ClusterCIDR, nil } func GetOSFromNode(node *v1.Node) string { if os, ok := node.Labels["kubernetes.io/os"]; ok { return os } return node.Labels["beta.kubernetes.io/os"] } func ContainsLoadBalancerIP(svc *v1.Service, ip string) bool { if slices.Contains(svc.Spec.ExternalIPs, ip) { return true } if lo.ContainsBy(svc.Status.LoadBalancer.Ingress, func(ingress v1.LoadBalancerIngress) bool { return ingress.IP == ip }) { return true } return false } func ConvertToImagePullPolicy(policy string) (v1.PullPolicy, error) { policyMap := map[string]v1.PullPolicy{ "Always": v1.PullAlways, "IfNotPresent": v1.PullIfNotPresent, "Never": v1.PullNever, } if pullPolicy, exists := policyMap[policy]; exists { return pullPolicy, nil } return "", fmt.Errorf("invalid image pull policy: %s, valid options are: Always, IfNotPresent, Never", policy) }