packages/kn-plugin-workflow/pkg/common/k8sclient/goapi.go (426 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ package k8sclient import ( "context" "fmt" "io" "log" "net/http" "os" "path/filepath" "strings" v1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/yaml" "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/tools/clientcmd/api" "k8s.io/client-go/tools/portforward" "k8s.io/client-go/transport/spdy" ) type GoAPI struct{} var selfSubjectAccessGVR = schema.GroupVersionResource{ Group: "authorization.k8s.io", Version: "v1", Resource: "selfsubjectaccessreviews", } var kubeconfigInfoDisplayed = false func (m GoAPI) IsCreateAllowed(resourcePath string, namespace string) (bool, error) { dynamicClient, err := DynamicClient() if err != nil { return false, fmt.Errorf("❌ ERROR: Failed to create dynamic Kubernetes client: %v", err) } if resources, err := ParseYamlFile(resourcePath); err != nil { return false, fmt.Errorf("❌ ERROR: Failed to parse YAML file: %v", err) } else { for _, resource := range resources { sar := parseResource(resource, namespace) result, err := dynamicClient.Resource(selfSubjectAccessGVR).Create(context.TODO(), sar, metav1.CreateOptions{}) if err != nil { return false, fmt.Errorf("failed to perform access review: %v", err) } allowed, _, _ := unstructured.NestedBool(result.Object, "status", "allowed") return allowed, nil } } return false, nil } func (m GoAPI) IsDeleteAllowed(name string, namespace string) error { dynamicClient, err := DynamicClient() if err != nil { return fmt.Errorf("❌ ERROR: Failed to create dynamic Kubernetes client: %v", err) } err = dynamicClient.Resource(selfSubjectAccessGVR).Namespace(namespace).Delete(context.TODO(), name, metav1.DeleteOptions{}) if err != nil { return fmt.Errorf("failed to perform access review: %v", err) } return nil } func (m GoAPI) GetCurrentNamespace() (string, error) { return GetCurrentNamespace() } func (m GoAPI) GetNamespace(namespace string) (*corev1.Namespace, error) { config, err := KubeRestConfig() if err != nil { return nil, fmt.Errorf("❌ ERROR: Failed to create rest config for Kubernetes client: %v", err) } clientSet, err := kubernetes.NewForConfig(config) if err != nil { return nil, fmt.Errorf("❌ ERROR: Failed to create k8s client: %v", err) } ns, err := clientSet.CoreV1().Namespaces().Get(context.TODO(), namespace, metav1.GetOptions{}) if err != nil { return nil, fmt.Errorf("❌ ERROR: Failed to get namespace: %v", err) } return ns, nil } func (m GoAPI) CheckContext() (string, error) { config, err := KubeApiConfig() if err != nil { return "", fmt.Errorf("❌ ERROR: No current k8s context found %w", err) } context := config.CurrentContext if context == "" { return "", fmt.Errorf("❌ ERROR: No current k8s context found") } fmt.Printf(" - ✅ k8s current context: %s\n", context) return context, nil } func (m GoAPI) ExecuteApply(path, namespace string) error { client, err := DynamicClient() if err != nil { return fmt.Errorf("❌ ERROR: Failed to create dynamic Kubernetes client: %v", err) } fmt.Printf("🔨 Applying YAML file %s\n", path) if namespace == "" { currentNamespace, err := m.GetCurrentNamespace() if err != nil { return fmt.Errorf("❌ ERROR: Failed to get current namespace: %w", err) } namespace = currentNamespace } if resources, err := ParseYamlFile(path); err != nil { return fmt.Errorf("❌ ERROR: Failed to parse YAML file: %v", err) } else { created := make([]unstructured.Unstructured, 0, len(resources)) for _, resource := range resources { gvk := resource.GroupVersionKind() gvr, _ := meta.UnsafeGuessKindToResource(gvk) if resource.GetNamespace() != "" && namespace != resource.GetNamespace() { return fmt.Errorf("❌ ERROR: the namespace from the provided object \"%s\" does not match"+ " the namespace \"%s\". You must pass '--namespace=%s' to perform this operation.:", resource.GetNamespace(), namespace, resource.GetNamespace()) } _, err := client.Resource(gvr).Namespace(namespace).Create(context.Background(), &resource, metav1.CreateOptions{}) if err != nil { if errors.IsAlreadyExists(err) { existingResource, err := client.Resource(gvr).Namespace(namespace).Get(context.Background(), resource.GetName(), metav1.GetOptions{}) if err != nil { return fmt.Errorf("❌ ERROR: Failed to get existing resource: %v", err) } resource.SetResourceVersion(existingResource.GetResourceVersion()) _, err = client.Resource(gvr).Namespace(namespace).Update(context.Background(), &resource, metav1.UpdateOptions{}) if err != nil { return fmt.Errorf("❌ ERROR: Failed to update resource: %v", err) } } else { // rollback if err := doRollback(created, namespace, client); err != nil { return fmt.Errorf("❌ ERROR: Failed to rollback resource: %v", err) } return fmt.Errorf("❌ ERROR: Failed to create resource: %v", err) } } created = append(created, resource) } } return nil } func (m GoAPI) ExecuteCreate(gvr schema.GroupVersionResource, object *unstructured.Unstructured, namespace string) (*unstructured.Unstructured, error) { dynamicClient, err := DynamicClient() if err != nil { return nil, fmt.Errorf("❌ ERROR: Failed to create dynamic Kubernetes client: %v", err) } resulted, err := dynamicClient.Resource(gvr).Namespace(namespace).Create(context.Background(), object, metav1.CreateOptions{}) if err != nil { if errors.IsAlreadyExists(err) { fmt.Printf("✅ Resource %q already exists\n", object.GetName()) } return nil, fmt.Errorf("❌ Failed to create resource: %v", err) } return resulted, nil } func (m GoAPI) ExecuteGet(gvr schema.GroupVersionResource, name string, namespace string) (*unstructured.Unstructured, error) { dynamicClient, err := DynamicClient() if err != nil { return nil, fmt.Errorf("❌ ERROR: Failed to create dynamic Kubernetes client: %v", err) } return dynamicClient.Resource(gvr).Namespace(namespace).Get(context.Background(), name, metav1.GetOptions{}) } func (m GoAPI) ExecuteDelete(path, namespace string) error { fmt.Println("🔨 Deleting resources...", path) if namespace == "" { currentNamespace, err := m.GetCurrentNamespace() if err != nil { return fmt.Errorf("❌ ERROR: Failed to get current namespace: %w", err) } namespace = currentNamespace } if resources, err := ParseYamlFile(path); err != nil { return fmt.Errorf("❌ ERROR: Failed to parse YAML file: %v", err) } else { for _, resource := range resources { gvk := resource.GroupVersionKind() gvr, _ := meta.UnsafeGuessKindToResource(gvk) err := m.ExecuteDeleteGVR(gvr, resource.GetName(), namespace) if err != nil { return fmt.Errorf("❌ ERROR: Failed to delete resource: %v", err) } } } return nil } func (m GoAPI) ExecuteDeleteGVR(gvr schema.GroupVersionResource, name string, namespace string) error { client, err := DynamicClient() if err != nil { return fmt.Errorf("❌ ERROR: Failed to create dynamic Kubernetes client: %v", err) } deletePolicy := metav1.DeletePropagationForeground err = client.Resource(gvr).Namespace(namespace).Delete(context.Background(), name, metav1.DeleteOptions{ PropagationPolicy: &deletePolicy, }) if err != nil { return fmt.Errorf("❌ ERROR: Failed to delete Resource: %w", err) } return nil } func (m GoAPI) CheckCrdExists(crd string) error { config, err := KubeRestConfig() if err != nil { return fmt.Errorf("❌ ERROR: Failed to create rest config for Kubernetes client: %v", err) } crdClientSet, err := clientset.NewForConfig(config) if err != nil { return fmt.Errorf("❌ ERROR: Failed to create k8s client: %v", err) } _, err = crdClientSet.ApiextensionsV1().CustomResourceDefinitions().Get(context.Background(), crd, metav1.GetOptions{}) if err != nil { return err } return nil } func (m GoAPI) GetDeploymentStatus(namespace, deploymentName string) (v1.DeploymentStatus, error) { if namespace == "" { currentNamespace, err := m.GetCurrentNamespace() if err != nil { return v1.DeploymentStatus{}, fmt.Errorf("❌ ERROR: Failed to get current namespace: %w", err) } namespace = currentNamespace } config, err := KubeRestConfig() if err != nil { return v1.DeploymentStatus{}, fmt.Errorf("❌ ERROR: Failed to create rest config for Kubernetes client: %v", err) } newConfig, err := kubernetes.NewForConfig(config) if err != nil { return v1.DeploymentStatus{}, fmt.Errorf("❌ ERROR: Failed to create k8s client: %v", err) } deployments, err := newConfig.AppsV1().Deployments(namespace).List(context.TODO(), metav1.ListOptions{ LabelSelector: fmt.Sprintf("sonataflow.org/workflow-app=%s", deploymentName), }) if err != nil { return v1.DeploymentStatus{}, fmt.Errorf("❌ ERROR: Failed to get deployments: %v", err) } if len(deployments.Items) == 0 { return v1.DeploymentStatus{}, NoDeploymentFound } if len(deployments.Items) > 1 { return v1.DeploymentStatus{}, fmt.Errorf("❌ ERROR: More than one deployment named %s in namespace %s found", deploymentName, namespace) } return deployments.Items[0].Status, nil } func (m GoAPI) PortForward(namespace, serviceName, portFrom, portTo string, onReady func()) error { if namespace == "" { currentNamespace, err := m.GetCurrentNamespace() if err != nil { return fmt.Errorf("❌ ERROR: Failed to get current namespace: %w", err) } namespace = currentNamespace } config, err := KubeRestConfig() if err != nil { return fmt.Errorf("❌ ERROR: Failed to create rest config for Kubernetes client: %v", err) } clientSet, err := kubernetes.NewForConfig(config) service, err := clientSet.CoreV1().Services(namespace).Get(context.TODO(), serviceName, metav1.GetOptions{}) if err != nil { return fmt.Errorf("❌ ERROR: Failed to get service: %v", err) } var labelSelector string for key, value := range service.Spec.Selector { if labelSelector != "" { labelSelector += "," } labelSelector += fmt.Sprintf("%s=%s", key, value) } pods, err := clientSet.CoreV1().Pods(namespace).List(context.TODO(), metav1.ListOptions{ LabelSelector: labelSelector, }) if err != nil { return fmt.Errorf("❌ ERROR: Failed to get pods: %v", err) } if len(pods.Items) == 0 { return fmt.Errorf("❌ ERROR: No pods found for service %s in namespace %s", serviceName, namespace) } req := clientSet.CoreV1().RESTClient().Post().Resource("pods").Namespace(pods.Items[0].Namespace). Name(pods.Items[0].Name).SubResource("portforward") transport, upgrader, err := spdy.RoundTripperFor(config) if err != nil { return fmt.Errorf("❌ ERROR: Failed to create round tripper: %v", err) } dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, "POST", req.URL()) errCh := make(chan error) stopCh := make(chan struct{}) readyCh := make(chan struct{}) ports := []string{fmt.Sprintf("%s:%s", portFrom, portTo)} go func() { forwardPorts, err := portforward.New(dialer, ports, stopCh, readyCh, io.Discard, os.Stderr); if err != nil { errCh <- err } err = forwardPorts.ForwardPorts() if err != nil { errCh <- err } }() select { case <-readyCh: onReady() case err := <-errCh: return fmt.Errorf("❌ Error starting port forwarding: %v\n", err) } <-stopCh return nil } func (m GoAPI) ExecuteList(gvr schema.GroupVersionResource, namespace string) (*unstructured.UnstructuredList, error) { client, err := DynamicClient() if err != nil { return nil, fmt.Errorf("❌ ERROR: Failed to create dynamic Kubernetes client: %v", err) } list, err := client.Resource(gvr).Namespace(namespace).List(context.Background(), metav1.ListOptions{}) if err != nil { return nil, fmt.Errorf("❌ ERROR: Failed to list resources: %v", err) } return list, nil } func KubeApiConfig() (*api.Config, error) { homeDir, err := os.UserHomeDir() if err != nil { return nil, fmt.Errorf("error getting user home dir: %w", err) } kubeConfigPath := filepath.Join(homeDir, ".kube", "config") if !kubeconfigInfoDisplayed { fmt.Printf("🔎 Using kubeconfig: %s\n", kubeConfigPath) kubeconfigInfoDisplayed = true } config, err := clientcmd.LoadFromFile(kubeConfigPath) if err != nil { return nil, fmt.Errorf("❌ ERROR: Failed to load kubeconfig: %w", err) } return config, nil } func KubeRestConfig() (*rest.Config, error) { config, err := rest.InClusterConfig() if err != nil { kubeConfig, err := KubeApiConfig() if err != nil { return nil, fmt.Errorf("❌ ERROR: Failed to load kubeconfig: %w", err) } clientConfig := clientcmd.NewDefaultClientConfig(*kubeConfig, &clientcmd.ConfigOverrides{}) restConfig, err := clientConfig.ClientConfig() if err != nil { log.Fatalf("Error converting to rest.Config: %v", err) } return restConfig, nil } return config, nil } var DynamicClient = func() (dynamic.Interface, error) { config, err := KubeRestConfig() if err != nil { return nil, fmt.Errorf("❌ ERROR: Failed to create rest config for Kubernetes client: %v", err) } dynamicClient, err := dynamic.NewForConfig(config) if err != nil { return nil, fmt.Errorf("❌ ERROR: Failed to create dynamic Kubernetes client: %v", err) } return dynamicClient, nil } var ParseYamlFile = func(path string) ([]unstructured.Unstructured, error) { data, err := os.ReadFile(path) if err != nil { return nil, fmt.Errorf("❌ ERROR: Failed to read YAML file: %w", err) } decoder := yaml.NewYAMLOrJSONDecoder(strings.NewReader(string(data)), 4096) result := []unstructured.Unstructured{} for { rawObj := &unstructured.Unstructured{} err := decoder.Decode(rawObj) if err != nil { break } result = append(result, *rawObj) } return result, nil } var GetCurrentNamespace = func() (string, error) { fmt.Println("🔎 Checking current namespace in k8s...") config, err := KubeApiConfig() if err != nil { return "", fmt.Errorf("❌ ERROR: Failed to get current k8s namespace: %w", err) } namespace := config.Contexts[config.CurrentContext].Namespace if len(namespace) == 0 { namespace = "default" } fmt.Printf(" - ✅ k8s current namespace: %s\n", namespace) return namespace, nil } func parseResource(resource unstructured.Unstructured, namespace string) *unstructured.Unstructured { gvk := resource.GroupVersionKind() gvr, _ := meta.UnsafeGuessKindToResource(gvk) sar := &unstructured.Unstructured{ Object: map[string]interface{}{ "apiVersion": "authorization.k8s.io/v1", "kind": "SelfSubjectAccessReview", "spec": map[string]interface{}{ "resourceAttributes": map[string]interface{}{ "namespace": namespace, "verb": gvr.Version, "group": gvk.Group, "resource": gvr.Resource, }, }, }, } return sar } func doRollback(created []unstructured.Unstructured, applyNamespace string, client dynamic.Interface) error { for _, r := range created { gvk := r.GroupVersionKind() gvr, _ := meta.UnsafeGuessKindToResource(gvk) if r.GetNamespace() != "" { applyNamespace = r.GetNamespace() } if err := client.Resource(gvr).Namespace(applyNamespace).Delete(context.Background(), r.GetName(), metav1.DeleteOptions{}); err != nil && !errors.IsNotFound(err) { return fmt.Errorf("❌ ERROR: Failed to rollback resource: %v", err) } } return nil }