internal/util/k8s.go (172 lines of code) (raw):

// Licensed to Apache Software Foundation (ASF) under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Apache Software Foundation (ASF) licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. // package util import ( "bytes" "context" "os" "path/filepath" "strings" apiv1 "k8s.io/api/admission/v1" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/serializer/yaml" yamlutil "k8s.io/apimachinery/pkg/util/yaml" "k8s.io/client-go/discovery" "k8s.io/client-go/discovery/cached/memory" "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/restmapper" "k8s.io/client-go/tools/clientcmd" "github.com/apache/skywalking-infra-e2e/internal/logger" ) // K8sClusterInfo created when connect to cluster type K8sClusterInfo struct { Client *kubernetes.Clientset Interface dynamic.Interface restConfig *rest.Config namespace string } // ConnectToK8sCluster gets clientSet and dynamic client from k8s config file. func ConnectToK8sCluster(kubeConfigPath string) (info *K8sClusterInfo, err error) { config, err := clientcmd.BuildConfigFromFlags("", kubeConfigPath) if err != nil { return nil, err } c, err := kubernetes.NewForConfig(config) if err != nil { return nil, err } dc, err := dynamic.NewForConfig(config) if err != nil { return nil, err } kubeConfigYaml, err := os.ReadFile(kubeConfigPath) if err != nil { return nil, err } restConfig, err := clientcmd.RESTConfigFromKubeConfig(kubeConfigYaml) if err != nil { return nil, err } logger.Log.Info("connect to k8s cluster succeeded") return &K8sClusterInfo{c, dc, restConfig, ""}, nil } func (c *K8sClusterInfo) CopyClusterToNamespace(namespace string) *K8sClusterInfo { return &K8sClusterInfo{ Client: c.Client, Interface: c.Interface, restConfig: c.restConfig, namespace: namespace, } } func (c *K8sClusterInfo) ToRESTConfig() (*rest.Config, error) { return c.restConfig, nil } func (c *K8sClusterInfo) ToDiscoveryClient() (discovery.CachedDiscoveryInterface, error) { config, err := c.ToRESTConfig() if err != nil { return nil, err } config.Burst = 100 discoveryClient, _ := discovery.NewDiscoveryClientForConfig(config) return memory.NewMemCacheClient(discoveryClient), nil } func (c *K8sClusterInfo) ToRESTMapper() (meta.RESTMapper, error) { discoveryClient, err := c.ToDiscoveryClient() if err != nil { return nil, err } mapper := restmapper.NewDeferredDiscoveryRESTMapper(discoveryClient) expander := restmapper.NewShortcutExpander(mapper, discoveryClient) return expander, nil } func (c *K8sClusterInfo) ToRawKubeConfigLoader() clientcmd.ClientConfig { loadingRules := clientcmd.NewDefaultClientConfigLoadingRules() loadingRules.DefaultClientConfig = &clientcmd.DefaultClientConfig overrides := &clientcmd.ConfigOverrides{ClusterDefaults: clientcmd.ClusterDefaults} overrides.Context.Namespace = c.namespace return clientcmd.NewNonInteractiveDeferredLoadingClientConfig(loadingRules, overrides) } // GetManifests recursively gets all yml and yaml files from manifests string. func GetManifests(manifests string) (files []string, err error) { s := make([]string, 0) files = strings.Split(manifests, ",") // file or directory for _, f := range files { f = ResolveAbs(f) fi, err := os.Stat(f) if err != nil { return nil, err } switch mode := fi.Mode(); { case mode.IsDir(): err := filepath.Walk(f, func(path string, info os.FileInfo, err error) error { if err != nil { return err } if strings.HasSuffix(path, ".yml") || strings.HasSuffix(path, ".yaml") { path = ResolveAbs(path) s = append(s, path) } return nil }) if err != nil { return nil, err } case mode.IsRegular(): filename := fi.Name() if strings.HasSuffix(filename, ".yml") || strings.HasSuffix(filename, ".yaml") { s = append(s, f) } } } return s, nil } // OperateManifest operates manifest in k8s cluster which kind created. func OperateManifest(c *kubernetes.Clientset, dc dynamic.Interface, manifest string, operation apiv1.Operation) error { b, err := os.ReadFile(manifest) if err != nil { return err } decoder := yamlutil.NewYAMLOrJSONDecoder(bytes.NewReader(b), 100) for { var rawObj runtime.RawExtension if err = decoder.Decode(&rawObj); err != nil { break } obj, gvk, err := yaml.NewDecodingSerializer(unstructured.UnstructuredJSONScheme).Decode(rawObj.Raw, nil, nil) if err != nil { return err } unstructuredMap, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj) if err != nil { return err } unstructuredObj := &unstructured.Unstructured{Object: unstructuredMap} apiGroupResource, err := restmapper.GetAPIGroupResources(c.Discovery()) if err != nil { return err } mapper := restmapper.NewDiscoveryRESTMapper(apiGroupResource) mapping, err := mapper.RESTMapping(gvk.GroupKind(), gvk.Version) if err != nil { return err } var dri dynamic.ResourceInterface if mapping.Scope.Name() == meta.RESTScopeNameNamespace { if unstructuredObj.GetNamespace() == "" { unstructuredObj.SetNamespace(metav1.NamespaceDefault) } dri = dc.Resource(mapping.Resource).Namespace(unstructuredObj.GetNamespace()) } else { dri = dc.Resource(mapping.Resource) } switch operation { case apiv1.Create: _, err = dri.Create(context.Background(), unstructuredObj, metav1.CreateOptions{}) case apiv1.Delete: err = dri.Delete(context.Background(), unstructuredObj.GetName(), metav1.DeleteOptions{}) } if err != nil { return err } } return nil }