pilot/pkg/config/kube/crdclient/client.go (437 lines of code) (raw):

// Copyright Istio Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // Package crdclient provides an implementation of the config store and cache // using Kubernetes Custom Resources and the informer framework from Kubernetes // // This code relies heavily on code generation for performance reasons; to implement the // Istio store interface, we need to take dynamic inputs. Using the dynamic informers results in poor // performance, as the cache will store unstructured objects which need to be marshaled on each Get/List call. // Using istio/client-go directly will cache objects marshaled, allowing us to have cheap Get/List calls, // at the expense of some code gen. package crdclient import ( "context" "errors" "fmt" "sync" "time" ) import ( "github.com/cenkalti/backoff/v4" jsonmerge "github.com/evanphx/json-patch/v5" "github.com/hashicorp/go-multierror" "go.uber.org/atomic" "gomodules.xyz/jsonpatch/v3" istioclient "istio.io/client-go/pkg/clientset/versioned" "istio.io/pkg/log" crd "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" klabels "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/json" "k8s.io/client-go/informers" _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" _ "k8s.io/client-go/plugin/pkg/client/auth/oidc" "k8s.io/client-go/tools/cache" gatewayapiclient "sigs.k8s.io/gateway-api/pkg/client/clientset/versioned" ) import ( "github.com/apache/dubbo-go-pixiu/pilot/pkg/features" "github.com/apache/dubbo-go-pixiu/pilot/pkg/model" "github.com/apache/dubbo-go-pixiu/pkg/config" "github.com/apache/dubbo-go-pixiu/pkg/config/schema/collection" "github.com/apache/dubbo-go-pixiu/pkg/config/schema/collections" "github.com/apache/dubbo-go-pixiu/pkg/config/schema/gvk" "github.com/apache/dubbo-go-pixiu/pkg/kube" "github.com/apache/dubbo-go-pixiu/pkg/queue" ) var scope = log.RegisterScope("kube", "Kubernetes client messages", 0) // Client is a client for Istio CRDs, implementing config store cache // This is used for CRUD operators on Istio configuration, as well as handling of events on config changes type Client struct { // schemas defines the set of schemas used by this client. // Note: this must be a subset of the schemas defined in the codegen schemas collection.Schemas // domainSuffix for the config metadata domainSuffix string // revision for this control plane instance. We will only read configs that match this revision. revision string // kinds keeps track of all cache handlers for known types kinds map[config.GroupVersionKind]*cacheHandler kindsMu sync.RWMutex queue queue.Instance // handlers defines a list of event handlers per-type handlers map[config.GroupVersionKind][]model.EventHandler // The istio/client-go client we will use to access objects istioClient istioclient.Interface // The gateway-api client we will use to access objects gatewayAPIClient gatewayapiclient.Interface // beginSync is set to true when calling SyncAll, it indicates the controller has began sync resources. beginSync *atomic.Bool // initialSync is set to true after performing an initial processing of all objects. initialSync *atomic.Bool schemasByCRDName map[string]collection.Schema client kube.Client crdMetadataInformer cache.SharedIndexInformer } var _ model.ConfigStoreController = &Client{} func New(client kube.Client, revision, domainSuffix string) (model.ConfigStoreController, error) { schemas := collections.Pilot if features.EnableGatewayAPI { schemas = collections.PilotGatewayAPI } return NewForSchemas(client, revision, domainSuffix, schemas) } var crdWatches = map[config.GroupVersionKind]*waiter{ gvk.KubernetesGateway: newWaiter(), gvk.GatewayClass: newWaiter(), } type waiter struct { once sync.Once stop chan struct{} } func newWaiter() *waiter { return &waiter{ once: sync.Once{}, stop: make(chan struct{}), } } // WaitForCRD waits until the request CRD exists, and returns true on success. A false return value // indicates the CRD does not exist but the wait failed or was canceled. // This is useful to conditionally enable controllers based on CRDs being created. func WaitForCRD(k config.GroupVersionKind, stop <-chan struct{}) bool { ch, f := crdWatches[k] if !f { log.Warnf("waiting for CRD that is not registered") return false } select { case <-stop: return false case <-ch.stop: return true } } func NewForSchemas(client kube.Client, revision, domainSuffix string, schemas collection.Schemas) (model.ConfigStoreController, error) { schemasByCRDName := map[string]collection.Schema{} for _, s := range schemas.All() { // From the spec: "Its name MUST be in the format <.spec.name>.<.spec.group>." name := fmt.Sprintf("%s.%s", s.Resource().Plural(), s.Resource().Group()) schemasByCRDName[name] = s } out := &Client{ domainSuffix: domainSuffix, schemas: schemas, schemasByCRDName: schemasByCRDName, revision: revision, queue: queue.NewQueue(1 * time.Second), kinds: map[config.GroupVersionKind]*cacheHandler{}, handlers: map[config.GroupVersionKind][]model.EventHandler{}, client: client, istioClient: client.Istio(), gatewayAPIClient: client.GatewayAPI(), crdMetadataInformer: client.MetadataInformer().ForResource(collections.K8SApiextensionsK8SIoV1Customresourcedefinitions.Resource(). GroupVersionResource()).Informer(), beginSync: atomic.NewBool(false), initialSync: atomic.NewBool(false), } known, err := knownCRDs(client.Ext()) if err != nil { return nil, err } for _, s := range schemas.All() { // From the spec: "Its name MUST be in the format <.spec.name>.<.spec.group>." name := fmt.Sprintf("%s.%s", s.Resource().Plural(), s.Resource().Group()) crd := true if _, f := collections.Builtin.Find(s.Name().String()); f { crd = false } if !crd { handleCRDAdd(out, name, nil) } else { if _, f := known[name]; f { handleCRDAdd(out, name, nil) } else { scope.Warnf("Skipping CRD %v as it is not present", s.Resource().GroupVersionKind()) } } } return out, nil } // Validate we are ready to handle events. Until the informers are synced, we will block the queue func (cl *Client) checkReadyForEvents(curr interface{}) error { if !cl.informerSynced() { return errors.New("waiting till full synchronization") } _, err := cache.DeletionHandlingMetaNamespaceKeyFunc(curr) if err != nil { scope.Infof("Error retrieving key: %v", err) } return nil } func (cl *Client) RegisterEventHandler(kind config.GroupVersionKind, handler model.EventHandler) { cl.handlers[kind] = append(cl.handlers[kind], handler) } func (cl *Client) SetWatchErrorHandler(handler func(r *cache.Reflector, err error)) error { var errs error for _, h := range cl.allKinds() { if err := h.informer.SetWatchErrorHandler(handler); err != nil { errs = multierror.Append(errs, err) } } return errs } // Run the queue and all informers. Callers should wait for HasSynced() before depending on results. func (cl *Client) Run(stop <-chan struct{}) { t0 := time.Now() scope.Info("Starting Pilot K8S CRD controller") if !cache.WaitForCacheSync(stop, cl.informerSynced) { scope.Error("Failed to sync Pilot K8S CRD controller cache") return } cl.SyncAll() cl.initialSync.Store(true) scope.Info("Pilot K8S CRD controller synced ", time.Since(t0)) cl.crdMetadataInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { crd, ok := obj.(*metav1.PartialObjectMetadata) if !ok { // Shouldn't happen scope.Errorf("wrong type %T: %v", obj, obj) return } handleCRDAdd(cl, crd.Name, stop) }, UpdateFunc: nil, DeleteFunc: nil, }) cl.queue.Run(stop) scope.Info("controller terminated") } func (cl *Client) informerSynced() bool { for _, ctl := range cl.allKinds() { if !ctl.informer.HasSynced() { scope.Infof("controller %q is syncing...", ctl.schema.Resource().GroupVersionKind()) return false } } return true } func (cl *Client) HasSynced() bool { return cl.initialSync.Load() } // SyncAll syncs all the objects during bootstrap to make the configs updated to caches func (cl *Client) SyncAll() { cl.beginSync.Store(true) wg := sync.WaitGroup{} for _, h := range cl.allKinds() { handlers := cl.handlers[h.schema.Resource().GroupVersionKind()] if len(handlers) == 0 { continue } h := h wg.Add(1) go func() { defer wg.Done() objects := h.informer.GetIndexer().List() for _, object := range objects { currItem, ok := object.(runtime.Object) if !ok { scope.Warnf("New Object can not be converted to runtime Object %v, is type %T", object, object) continue } currConfig := TranslateObject(currItem, h.schema.Resource().GroupVersionKind(), h.client.domainSuffix) for _, f := range handlers { f(config.Config{}, currConfig, model.EventAdd) } } }() } wg.Wait() } // Schemas for the store func (cl *Client) Schemas() collection.Schemas { return cl.schemas } // Get implements store interface func (cl *Client) Get(typ config.GroupVersionKind, name, namespace string) *config.Config { h, f := cl.kind(typ) if !f { scope.Warnf("unknown type: %s", typ) return nil } obj, err := h.lister(namespace).Get(name) if err != nil { // TODO we should be returning errors not logging scope.Warnf("error on get %v/%v: %v", name, namespace, err) return nil } cfg := TranslateObject(obj, typ, cl.domainSuffix) if !cl.objectInRevision(&cfg) { return nil } return &cfg } // Create implements store interface func (cl *Client) Create(cfg config.Config) (string, error) { if cfg.Spec == nil { return "", fmt.Errorf("nil spec for %v/%v", cfg.Name, cfg.Namespace) } meta, err := create(cl.istioClient, cl.gatewayAPIClient, cfg, getObjectMetadata(cfg)) if err != nil { return "", err } return meta.GetResourceVersion(), nil } // Update implements store interface func (cl *Client) Update(cfg config.Config) (string, error) { if cfg.Spec == nil { return "", fmt.Errorf("nil spec for %v/%v", cfg.Name, cfg.Namespace) } meta, err := update(cl.istioClient, cl.gatewayAPIClient, cfg, getObjectMetadata(cfg)) if err != nil { return "", err } return meta.GetResourceVersion(), nil } func (cl *Client) UpdateStatus(cfg config.Config) (string, error) { if cfg.Status == nil { return "", fmt.Errorf("nil status for %v/%v on updateStatus()", cfg.Name, cfg.Namespace) } meta, err := updateStatus(cl.istioClient, cl.gatewayAPIClient, cfg, getObjectMetadata(cfg)) if err != nil { return "", err } return meta.GetResourceVersion(), nil } // Patch applies only the modifications made in the PatchFunc rather than doing a full replace. Useful to avoid // read-modify-write conflicts when there are many concurrent-writers to the same resource. func (cl *Client) Patch(orig config.Config, patchFn config.PatchFunc) (string, error) { modified, patchType := patchFn(orig.DeepCopy()) meta, err := patch(cl.istioClient, cl.gatewayAPIClient, orig, getObjectMetadata(orig), modified, getObjectMetadata(modified), patchType) if err != nil { return "", err } return meta.GetResourceVersion(), nil } // Delete implements store interface // `resourceVersion` must be matched before deletion is carried out. If not possible, a 409 Conflict status will be func (cl *Client) Delete(typ config.GroupVersionKind, name, namespace string, resourceVersion *string) error { return delete(cl.istioClient, cl.gatewayAPIClient, typ, name, namespace, resourceVersion) } // List implements store interface func (cl *Client) List(kind config.GroupVersionKind, namespace string) ([]config.Config, error) { h, f := cl.kind(kind) if !f { return nil, nil } list, err := h.lister(namespace).List(klabels.Everything()) if err != nil { return nil, err } out := make([]config.Config, 0, len(list)) for _, item := range list { cfg := TranslateObject(item, kind, cl.domainSuffix) if cl.objectInRevision(&cfg) { out = append(out, cfg) } } return out, err } func (cl *Client) objectInRevision(o *config.Config) bool { return config.ObjectInRevision(o, cl.revision) } func (cl *Client) allKinds() []*cacheHandler { cl.kindsMu.RLock() defer cl.kindsMu.RUnlock() ret := make([]*cacheHandler, 0, len(cl.kinds)) for _, k := range cl.kinds { ret = append(ret, k) } return ret } func (cl *Client) kind(r config.GroupVersionKind) (*cacheHandler, bool) { cl.kindsMu.RLock() defer cl.kindsMu.RUnlock() ch, ok := cl.kinds[r] return ch, ok } // knownCRDs returns all CRDs present in the cluster, with timeout and retries. func knownCRDs(crdClient apiextensionsclient.Interface) (map[string]struct{}, error) { var res *crd.CustomResourceDefinitionList b := backoff.NewExponentialBackOff() b.InitialInterval = time.Second b.MaxElapsedTime = time.Minute err := backoff.Retry(func() error { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() var err error res, err = crdClient.ApiextensionsV1().CustomResourceDefinitions().List(ctx, metav1.ListOptions{}) if err == nil { return nil } scope.Errorf("failed to list CRDs: %v", err) return err }, b) if err != nil { return nil, err } mp := map[string]struct{}{} for _, r := range res.Items { mp[r.Name] = struct{}{} } return mp, nil } func TranslateObject(r runtime.Object, gvk config.GroupVersionKind, domainSuffix string) config.Config { translateFunc, f := translationMap[gvk] if !f { scope.Errorf("unknown type %v", gvk) return config.Config{} } c := translateFunc(r) c.Domain = domainSuffix return c } func getObjectMetadata(config config.Config) metav1.ObjectMeta { return metav1.ObjectMeta{ Name: config.Name, Namespace: config.Namespace, Labels: config.Labels, Annotations: config.Annotations, ResourceVersion: config.ResourceVersion, OwnerReferences: config.OwnerReferences, UID: types.UID(config.UID), } } func genPatchBytes(oldRes, modRes runtime.Object, patchType types.PatchType) ([]byte, error) { oldJSON, err := json.Marshal(oldRes) if err != nil { return nil, fmt.Errorf("failed marhsalling original resource: %v", err) } newJSON, err := json.Marshal(modRes) if err != nil { return nil, fmt.Errorf("failed marhsalling modified resource: %v", err) } switch patchType { case types.JSONPatchType: ops, err := jsonpatch.CreatePatch(oldJSON, newJSON) if err != nil { return nil, err } return json.Marshal(ops) case types.MergePatchType: return jsonmerge.CreateMergePatch(oldJSON, newJSON) default: return nil, fmt.Errorf("unsupported patch type: %v. must be one of JSONPatchType or MergePatchType", patchType) } } func handleCRDAdd(cl *Client, name string, stop <-chan struct{}) { scope.Debugf("adding CRD %q", name) s, f := cl.schemasByCRDName[name] if !f { scope.Debugf("added resource that we are not watching: %v", name) return } resourceGVK := s.Resource().GroupVersionKind() gvr := s.Resource().GroupVersionResource() cl.kindsMu.Lock() defer cl.kindsMu.Unlock() if _, f := cl.kinds[resourceGVK]; f { scope.Debugf("added resource that already exists: %v", resourceGVK) return } var i informers.GenericInformer var ifactory starter var err error switch s.Resource().Group() { case gvk.KubernetesGateway.Group: ifactory = cl.client.GatewayAPIInformer() i, err = cl.client.GatewayAPIInformer().ForResource(gvr) case gvk.Pod.Group, gvk.Deployment.Group, gvk.MutatingWebhookConfiguration.Group: ifactory = cl.client.KubeInformer() i, err = cl.client.KubeInformer().ForResource(gvr) case gvk.CustomResourceDefinition.Group: ifactory = cl.client.ExtInformer() i, err = cl.client.ExtInformer().ForResource(gvr) default: ifactory = cl.client.IstioInformer() i, err = cl.client.IstioInformer().ForResource(gvr) } if err != nil { // Shouldn't happen scope.Errorf("failed to create informer for %v: %v", resourceGVK, err) return } cl.kinds[resourceGVK] = createCacheHandler(cl, s, i) if w, f := crdWatches[resourceGVK]; f { scope.Infof("notifying watchers %v was created", resourceGVK) w.once.Do(func() { close(w.stop) }) } if stop != nil { // Start informer factory, only if stop is defined. In startup case, we will not start here as // we will start all factories once we are ready to initialize. // For dynamically added CRDs, we need to start immediately though ifactory.Start(stop) } } type starter interface { Start(stopCh <-chan struct{}) }