pkg/k8s/k8s_client.go (149 lines of code) (raw):

// Copyright 2021 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // https://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package k8s import ( "context" "flag" "log" "os" "strings" "github.com/GoogleCloudPlatform/cloud-run-mesh/pkg/mesh" authenticationv1 "k8s.io/api/authentication/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/tools/clientcmd" "k8s.io/klog" ) import ( "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" ) var Debug = true type K8S struct { Mesh *mesh.KRun Client *kubernetes.Clientset } func K8SClient(kr *mesh.KRun) *kubernetes.Clientset { if k8s, ok := kr.TokenProvider.(*K8S); ok { return k8s.Client } return nil } // Init klog.InitFlags from an env (to avoid messing with the CLI of // the app). For example -v=9 lists full request content, -v=7 lists requests headers func init() { fs := &flag.FlagSet{} klog.InitFlags(fs) kf := strings.Split(os.Getenv("KLOG_FLAGS"), " ") fs.Parse(kf) } // initUsingKubeConfig uses KUBECONFIG or $HOME/.kube/config // to init the primary k8s cluster. // // error is set if KUBECONFIG is set or ~/.kube/config exists and // fail to load. If the file doesn't exist, err is nil. func (kr *K8S) initUsingKubeConfig() error { // Explicit kube config - use it kc := os.Getenv("KUBECONFIG") if kc == "" { kc = os.Getenv("HOME") + "/.kube/config" } if _, err := os.Stat(kc); err == nil { cf, err := clientcmd.LoadFromFile(kc) //config := clientcmd.NewNonInteractiveClientConfig(cf, cf.CurrentContext, nil, nil) if strings.HasPrefix(cf.CurrentContext, "gke_") { parts := strings.Split(cf.CurrentContext, "_") if len(parts) > 3 { // TODO: if env variable with cluster name/location are set - use that for context kr.Mesh.ProjectId = parts[1] kr.Mesh.ClusterLocation = parts[2] kr.Mesh.ClusterName = parts[3] } } if strings.HasPrefix(cf.CurrentContext, "connectgateway_") { parts := strings.Split(cf.CurrentContext, "_") if len(parts) > 2 { // TODO: if env variable with cluster name/location are set - use that for context kr.Mesh.ProjectId = parts[1] kr.Mesh.ClusterName = parts[2] } } config, err := clientcmd.BuildConfigFromFlags("", kc) if err != nil { return err } kr.Client, err = kubernetes.NewForConfig(config) if err != nil { return err } if Debug { log.Println("Using Kubeconfig", cf.CurrentContext, kc) } return nil } return nil } func (kr *K8S) initInCluster() error { if kr.Client != nil { return nil } hostInClustser := os.Getenv("KUBERNETES_SERVICE_HOST") if hostInClustser == "" { return nil } config, err := rest.InClusterConfig() if err != nil { panic(err) } kr.Client, err = kubernetes.NewForConfig(config) if err != nil { return err } if Debug { log.Println("Using in-cluster k8s ", hostInClustser) } kr.Mesh.InCluster = true return nil } // K8SClient will discover a K8S config cluster and return the client func (kr *K8S) K8SClient(ctx context.Context) error { if kr.Client != nil { return nil } err := kr.initUsingKubeConfig() if err != nil { return err } err = kr.initInCluster() if err != nil { return err } return nil } // LoadConfig gets the default k8s client, using environment // variables to decide how: // // - KUBECONFIG or $HOME/.kube/config will be tried first // - GKE is checked - using env or metadata server to get // PROJECT_ID, CLUSTER_LOCATION, CLUSTER_NAME (if not set), and // construct a kube config to use. // - (in future other vendor-specific methods may be added) // - finally in-cluster will be checked. // // Once the cluster is found, additional config can be loaded from // the cluster. // Read with Secrets and ConfigMaps func (kr *K8S) GetCM(ctx context.Context, ns string, name string) (map[string]string, error) { s, err := kr.Client.CoreV1().ConfigMaps(ns).Get(ctx, name, metav1.GetOptions{}) if err != nil { if Is404(err) { err = nil } return map[string]string{}, err } return s.Data, nil } func (kr *K8S) GetSecret(ctx context.Context, ns string, name string) (map[string][]byte, error) { s, err := kr.Client.CoreV1().Secrets(ns).Get(ctx, name, metav1.GetOptions{}) if err != nil { if Is404(err) { err = nil } return map[string][]byte{}, err } return s.Data, nil } func Is404(err error) bool { if se, ok := err.(*k8serrors.StatusError); ok { if se.ErrStatus.Code == 404 { return true } } return false } // GetToken returns a token with the given audience for the current KSA, using CreateToken request. // Used by the STS token exchanger. func (kr *K8S) GetToken(ctx context.Context, aud string) (string, error) { treq := &authenticationv1.TokenRequest{ Spec: authenticationv1.TokenRequestSpec{ Audiences: []string{aud}, }, } ts, err := kr.Client.CoreV1().ServiceAccounts(kr.Mesh.Namespace).CreateToken(ctx, kr.Mesh.KSA, treq, metav1.CreateOptions{}) if err != nil { return "", err } return ts.Status.Token, nil }