pkg/cloud_provider/clientset/clientset.go (193 lines of code) (raw):

/* Copyright 2018 The Kubernetes Authors. Copyright 2022 Google LLC Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at https://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package clientset import ( "context" "errors" "fmt" "time" "github.com/googlecloudplatform/gcs-fuse-csi-driver/pkg/webhook" authenticationv1 "k8s.io/api/authentication/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" listersv1 "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" "k8s.io/klog/v2" ) type Interface interface { ConfigurePodLister(nodeName string) ConfigureNodeLister(nodeName string) GetPod(namespace, name string) (*corev1.Pod, error) CreateServiceAccountToken(ctx context.Context, namespace, name string, tokenRequest *authenticationv1.TokenRequest) (*authenticationv1.TokenRequest, error) GetGCPServiceAccountName(ctx context.Context, namespace, name string) (string, error) GetNode(name string) (*corev1.Node, error) } type PodInfo struct { Name string Namespace string } type Clientset struct { k8sClients kubernetes.Interface podLister listersv1.PodLister nodeLister listersv1.NodeLister informerResyncDurationSec int } const GkeMetaDataServerKey = "iam.gke.io/gke-metadata-server-enabled" func (c *Clientset) ConfigureNodeLister(nodeName string) { trim := func(obj interface{}) (interface{}, error) { if accessor, err := meta.Accessor(obj); err == nil { if accessor.GetManagedFields() != nil { accessor.SetManagedFields(nil) } } // We are filtering only for relevant Node annotations to optimize memory usage. // Relevant info is for NodePublishVolume calls: // https://github.com/GoogleCloudPlatform/gcs-fuse-csi-driver/blob/547cab9a9aea4cdbda581885880020fb9266dc03/pkg/csi_driver/node.go#L85 nodeObj, ok := obj.(*corev1.Node) if !ok { return obj, nil } newLabels := map[string]string{} isGkeMetaDataServerEnabled, ok := nodeObj.ObjectMeta.Labels[GkeMetaDataServerKey] if ok { newLabels[GkeMetaDataServerKey] = isGkeMetaDataServerEnabled } nodeObj.Spec = corev1.NodeSpec{} nodeObj.Status = corev1.NodeStatus{} nodeObj.ObjectMeta.Annotations = nil nodeObj.ObjectMeta.Labels = newLabels return obj, nil } informerFactory := informers.NewSharedInformerFactoryWithOptions( c.k8sClients, time.Duration(c.informerResyncDurationSec)*time.Second, informers.WithTweakListOptions(func(options *metav1.ListOptions) { options.FieldSelector = "metadata.name=" + nodeName }), informers.WithTransform(trim), ) nodeLister := informerFactory.Core().V1().Nodes().Lister() ctx := context.Background() informerFactory.Start(ctx.Done()) informerFactory.WaitForCacheSync(ctx.Done()) c.nodeLister = nodeLister } func New(kubeconfigPath string, informerResyncDurationSec int) (Interface, error) { var err error var rc *rest.Config if kubeconfigPath != "" { klog.V(4).Infof("using kubeconfig path %q", kubeconfigPath) rc, err = clientcmd.BuildConfigFromFlags("", kubeconfigPath) } else { klog.V(4).Info("using in-cluster kubeconfig") rc, err = rest.InClusterConfig() } if err != nil { return nil, fmt.Errorf("failed to read kubeconfig: %w", err) } rc.ContentType = runtime.ContentTypeProtobuf clientset, err := kubernetes.NewForConfig(rc) if err != nil { return nil, fmt.Errorf("failed to configure k8s client: %w", err) } return &Clientset{k8sClients: clientset, informerResyncDurationSec: informerResyncDurationSec}, nil } func (c *Clientset) ConfigurePodLister(nodeName string) { trim := func(obj interface{}) (interface{}, error) { if accessor, err := meta.Accessor(obj); err == nil { if accessor.GetManagedFields() != nil { accessor.SetManagedFields(nil) } } // We are filtering only for relevant PodSpec info to optimize memory usage. // Relevant info is for NodePublishVolume calls: // https://github.com/GoogleCloudPlatform/gcs-fuse-csi-driver/blob/547cab9a9aea4cdbda581885880020fb9266dc03/pkg/csi_driver/node.go#L85 podObj, ok := obj.(*corev1.Pod) if !ok { return obj, nil } var newContainers []corev1.Container for _, cont := range podObj.Spec.Containers { container := corev1.Container{ Name: cont.Name, SecurityContext: cont.SecurityContext, VolumeMounts: cont.VolumeMounts, } newContainers = append(newContainers, container) } var newInitContainers []corev1.Container for _, cont := range podObj.Spec.InitContainers { if cont.Name == webhook.GcsFuseSidecarName { newInitContainers = append(newInitContainers, cont) continue } container := corev1.Container{ Name: cont.Name, SecurityContext: cont.SecurityContext, VolumeMounts: cont.VolumeMounts, } newInitContainers = append(newInitContainers, container) } nodeName := podObj.Spec.NodeName volumes := podObj.Spec.Volumes restartPolicy := podObj.Spec.RestartPolicy hostNetwork := podObj.Spec.HostNetwork podObj.Spec = corev1.PodSpec{ NodeName: nodeName, Volumes: volumes, Containers: newContainers, InitContainers: newInitContainers, RestartPolicy: restartPolicy, HostNetwork: hostNetwork, } return obj, nil } informerFactory := informers.NewSharedInformerFactoryWithOptions( c.k8sClients, time.Duration(c.informerResyncDurationSec)*time.Second, informers.WithTweakListOptions(func(options *metav1.ListOptions) { options.FieldSelector = "spec.nodeName=" + nodeName }), informers.WithTransform(trim), ) podLister := informerFactory.Core().V1().Pods().Lister() ctx := context.Background() informerFactory.Start(ctx.Done()) informerFactory.WaitForCacheSync(ctx.Done()) c.podLister = podLister } func (c *Clientset) GetPod(namespace, name string) (*corev1.Pod, error) { if c.podLister == nil { return nil, errors.New("pod informer is not ready") } return c.podLister.Pods(namespace).Get(name) } func (c *Clientset) GetNode(name string) (*corev1.Node, error) { if c.nodeLister == nil { return nil, errors.New("node informer is not ready") } return c.nodeLister.Get(name) } func (c *Clientset) CreateServiceAccountToken(ctx context.Context, namespace, name string, tokenRequest *authenticationv1.TokenRequest) (*authenticationv1.TokenRequest, error) { resp, err := c.k8sClients. CoreV1(). ServiceAccounts(namespace). CreateToken( ctx, name, tokenRequest, metav1.CreateOptions{}, ) return resp, err } func (c *Clientset) GetGCPServiceAccountName(ctx context.Context, namespace, name string) (string, error) { resp, err := c.k8sClients. CoreV1(). ServiceAccounts(namespace). Get( ctx, name, metav1.GetOptions{}, ) if err != nil { return "", fmt.Errorf("failed to call Kubernetes ServiceAccount.Get API: %w", err) } return resp.Annotations["iam.gke.io/gcp-service-account"], nil }