pkg/cli/preview/intercepting_kube_client.go (274 lines of code) (raw):

// Copyright 2025 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package preview import ( "context" "fmt" "maps" "net/http" "slices" "sync" "sync/atomic" "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/rest" "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/apiutil" ) // interceptingKubeClient is a Kubernetes client that intercepts Kubernetes API calls. // It forwards read-only operations "upstream" to real Kubernetes. // It returns an error on any write operations. type interceptingKubeClient struct { upstreamRestConfig *rest.Config upstreamClient *StreamingClient upstreamRestMapper meta.RESTMapper recorder *Recorder } // newInterceptingKubeClient creates a new interceptingKubeClient. func newInterceptingKubeClient(recorder *Recorder, upstreamRestConfig *rest.Config) (*interceptingKubeClient, error) { httpClient, err := rest.HTTPClientFor(upstreamRestConfig) if err != nil { return nil, fmt.Errorf("building http client: %w", err) } upstreamRestMapper, err := apiutil.NewDiscoveryRESTMapper(upstreamRestConfig, httpClient) if err != nil { return nil, fmt.Errorf("creating REST mapper: %w", err) } // TODO: Replace with rest.DefaultServerUrlFor baseURL, _, err := DefaultServerUrlFor(upstreamRestConfig) if err != nil { return nil, fmt.Errorf("getting base url: %w", err) } clientOptions := ClientOptions{ BaseURL: baseURL, HTTPClient: httpClient, } upstreamClient := NewStreamingClient(clientOptions) return &interceptingKubeClient{ upstreamRestConfig: upstreamRestConfig, upstreamClient: upstreamClient, upstreamRestMapper: upstreamRestMapper, recorder: recorder, }, nil } // MapperProvider returns the REST mapper for the interceptingKubeClient. func (c *interceptingKubeClient) MapperProvider(*rest.Config, *http.Client) (meta.RESTMapper, error) { // TODO: Verify restConfig or httpClient? return c.upstreamRestMapper, nil } // NewClient creates a controller-runtime client that intercepts Kubernetes API calls. func (c *interceptingKubeClient) NewClient(config *rest.Config, options client.Options) (client.Client, error) { typeStore := &typeStore{ restMapper: c.upstreamRestMapper, scheme: options.Scheme, } client := &interceptingControllerRuntimeClient{parent: c, typeStore: typeStore} if options.Cache == nil || options.Cache.Reader == nil { return client, nil } client.cache = options.Cache.Reader client.cacheUnstructured = options.Cache.Unstructured client.uncachedGVKs = make(map[schema.GroupVersionKind]struct{}) for _, obj := range options.Cache.DisableFor { gvk, err := apiutil.GVKForObject(obj, options.Scheme) if err != nil { return nil, err } client.uncachedGVKs[gvk] = struct{}{} } return client, nil } // NewCache creates a controller-runtime cache that intercepts Kubernetes API calls. func (c *interceptingKubeClient) NewCache(restConfig *rest.Config, opts cache.Options) (cache.Cache, error) { typeStore := &typeStore{ restMapper: c.upstreamRestMapper, scheme: opts.Scheme, } return newInterceptingControllerRuntimeCache(c.upstreamClient, typeStore) } // interceptingControllerRuntimeClient is a controller-runtime client that intercepts Kubernetes API calls. // It forwards read-only operations "upstream" to real Kubernetes. // It returns a BlockedKubeError on any write operations. type interceptingControllerRuntimeClient struct { parent *interceptingKubeClient typeStore *typeStore cache client.Reader uncachedGVKs map[schema.GroupVersionKind]struct{} cacheUnstructured bool } // blockedMethod is called when a write operation is attempted. // It returns an error, so that the write operation is not forwarded upstream. func (c *interceptingControllerRuntimeClient) blockedMethod(ctx context.Context, method string, args ...any) error { c.parent.recorder.RecordBlockedKubeMethod(ctx, method, args...) return fmt.Errorf("%q blocked in preview mode", method) } // ignoredMethod is called when a read operation is attempted. // It returns nil, but does not actually forward the operation upstream. // This is useful for status updates, where we want to record the GCP operation, // which typically happens after the status update is made. func (c *interceptingControllerRuntimeClient) ignoredMethod(ctx context.Context, method string, args ...any) error { c.parent.recorder.RecordIgnoredKubeMethod(ctx, method, args...) return nil } var _ client.Client = &interceptingControllerRuntimeClient{} func (c *interceptingControllerRuntimeClient) shouldBypassCache(isUnstructured bool, typeInfo *typeInfo) bool { if c.cache == nil { return true } gvk := typeInfo.gvk // if meta.IsListType(obj) { // gvk.Kind = strings.TrimSuffix(gvk.Kind, "List") // } if _, isUncached := c.uncachedGVKs[gvk]; isUncached { return true } if !c.cacheUnstructured { return isUnstructured } return false } // Get retrieves an obj for the given object key from the Kubernetes Cluster. // obj must be a struct pointer so that obj can be updated with the response // returned by the Server. func (c *interceptingControllerRuntimeClient) Get(ctx context.Context, key client.ObjectKey, obj client.Object, opts ...client.GetOption) error { _, isUnstructured := obj.(runtime.Unstructured) typeInfo, err := c.typeStore.getTypeInfo(obj) if err != nil { return err } shouldBypassCache := c.shouldBypassCache(isUnstructured, typeInfo) if !shouldBypassCache { return c.cache.Get(ctx, key, obj, opts...) } client := c.parent.upstreamClient namespace := key.Namespace name := key.Name if len(opts) > 0 { klog.Fatalf("interceptingControllerRuntimeClient: GET %v into %T with opts %v (cache=%+v)", key, obj, opts, c.cache) } return client.Get(ctx, typeInfo, namespace, name, obj) } // List retrieves list of objects for a given namespace and list options. On a // successful call, Items field in the list will be populated with the // result returned from the server. func (c *interceptingControllerRuntimeClient) List(ctx context.Context, list client.ObjectList, opts ...client.ListOption) error { panic("not implemented") } // Create saves the object obj in the Kubernetes cluster. obj must be a // struct pointer so that obj can be updated with the content returned by the Server. func (c *interceptingControllerRuntimeClient) Create(ctx context.Context, obj client.Object, opts ...client.CreateOption) error { return c.blockedMethod(ctx, "create", obj, opts) } // Delete deletes the given obj from Kubernetes cluster. func (c *interceptingControllerRuntimeClient) Delete(ctx context.Context, obj client.Object, opts ...client.DeleteOption) error { return c.blockedMethod(ctx, "delete", obj, opts) } // Update updates the given obj in the Kubernetes cluster. obj must be a // struct pointer so that obj can be updated with the content returned by the Server. func (c *interceptingControllerRuntimeClient) Update(ctx context.Context, obj client.Object, opts ...client.UpdateOption) error { return c.blockedMethod(ctx, "update", obj, opts) } // Patch patches the given obj in the Kubernetes cluster. obj must be a // struct pointer so that obj can be updated with the content returned by the Server. func (c *interceptingControllerRuntimeClient) Patch(ctx context.Context, obj client.Object, patch client.Patch, opts ...client.PatchOption) error { return c.blockedMethod(ctx, "patch", obj, opts) } // DeleteAllOf deletes all objects of the given type matching the given options. func (c *interceptingControllerRuntimeClient) DeleteAllOf(ctx context.Context, obj client.Object, opts ...client.DeleteAllOfOption) error { return c.blockedMethod(ctx, "deleteAllOf", obj, opts) } // Create a client which can update status subresource for kubernetes objects. func (c *interceptingControllerRuntimeClient) Status() client.SubResourceWriter { return &interceptingControllerRuntimeClientSubResourceWriter{client: c, Subresource: "status"} } func (c *interceptingControllerRuntimeClient) SubResource(subResource string) client.SubResourceClient { return &interceptingControllerRuntimeClientSubResourceWriter{client: c, Subresource: subResource} } // Scheme returns the scheme this client is using. func (c *interceptingControllerRuntimeClient) Scheme() *runtime.Scheme { panic("not implemented") } // RESTMapper returns the rest this client is using. func (c *interceptingControllerRuntimeClient) RESTMapper() meta.RESTMapper { panic("not implemented") } // GroupVersionKindFor returns the GroupVersionKind for the given object. func (c *interceptingControllerRuntimeClient) GroupVersionKindFor(obj runtime.Object) (schema.GroupVersionKind, error) { panic("not implemented") } // IsObjectNamespaced returns true if the GroupVersionKind of the object is namespaced. func (c *interceptingControllerRuntimeClient) IsObjectNamespaced(obj runtime.Object) (bool, error) { panic("not implemented") } type interceptingControllerRuntimeClientSubResourceWriter struct { client *interceptingControllerRuntimeClient Subresource string } var _ client.SubResourceWriter = &interceptingControllerRuntimeClientSubResourceWriter{} func (c *interceptingControllerRuntimeClientSubResourceWriter) Get(ctx context.Context, obj client.Object, subResource client.Object, opts ...client.SubResourceGetOption) error { return c.client.blockedMethod(ctx, "status.get", obj, subResource, opts) } func (c *interceptingControllerRuntimeClientSubResourceWriter) Create(ctx context.Context, obj client.Object, subResource client.Object, opts ...client.SubResourceCreateOption) error { return c.client.blockedMethod(ctx, "status.create", obj, subResource, opts) } func (c *interceptingControllerRuntimeClientSubResourceWriter) Update(ctx context.Context, obj client.Object, opts ...client.SubResourceUpdateOption) error { return c.client.ignoredMethod(ctx, "status.update", obj, opts) } func (c *interceptingControllerRuntimeClientSubResourceWriter) Patch(ctx context.Context, obj client.Object, patch client.Patch, opts ...client.SubResourcePatchOption) error { return c.client.blockedMethod(ctx, "status.patch", obj, patch, opts) } // interceptingControllerRuntimeCache is a controller-runtime cache that intercepts Kubernetes API calls. // Write operations would be blocked, but generally the cache is only used for read operations. type interceptingControllerRuntimeCache struct { streamingClient *StreamingClient typeStore *typeStore mutex sync.Mutex started atomic.Bool informers map[schema.GroupVersionKind]*streamingInformer } // newInterceptingControllerRuntimeCache creates a new interceptingControllerRuntimeCache. func newInterceptingControllerRuntimeCache(streamingClient *StreamingClient, typeStore *typeStore) (*interceptingControllerRuntimeCache, error) { return &interceptingControllerRuntimeCache{ streamingClient: streamingClient, informers: make(map[schema.GroupVersionKind]*streamingInformer), typeStore: typeStore, }, nil } var _ cache.Cache = &interceptingControllerRuntimeCache{} // Get retrieves an obj for the given object key from the Kubernetes Cluster. // obj must be a struct pointer so that obj can be updated with the response // returned by the Server. func (c *interceptingControllerRuntimeCache) Get(ctx context.Context, key client.ObjectKey, obj client.Object, opts ...client.GetOption) error { if len(opts) != 0 { klog.Fatalf("interceptingControllerRuntimeCache: GET %v into %T with opts %v", key, obj, opts) panic("not implemented") } typeInfo, err := c.typeStore.getTypeInfo(obj) if err != nil { return err } informer, err := c.getOrCreateInformer(ctx, typeInfo) if err != nil { return err } return informer.Get(ctx, key, obj) } // List retrieves list of objects for a given namespace and list options. On a // successful call, Items field in the list will be populated with the // result returned from the server. func (c *interceptingControllerRuntimeCache) List(ctx context.Context, list client.ObjectList, opts ...client.ListOption) error { panic("not implemented") } // GetInformer fetches or constructs an informer for the given object that corresponds to a single // API kind and resource. func (c *interceptingControllerRuntimeCache) GetInformer(ctx context.Context, obj client.Object) (cache.Informer, error) { typeInfo, err := c.typeStore.getTypeInfo(obj) if err != nil { return nil, err } return c.getOrCreateInformer(ctx, typeInfo) } func (c *interceptingControllerRuntimeCache) getOrCreateInformer(ctx context.Context, typeInfo *typeInfo) (*streamingInformer, error) { gvk := typeInfo.gvk c.mutex.Lock() defer c.mutex.Unlock() existing := c.informers[gvk] if existing != nil { return existing, nil } informer, err := newStreamingInformer(c.streamingClient, typeInfo) if err != nil { return nil, err } c.informers[gvk] = informer if c.started.Load() { if err := informer.Start(ctx); err != nil { return nil, fmt.Errorf("error starting informer: %w", err) } } return informer, nil } // GetInformerForKind is similar to GetInformer, except that it takes a group-version-kind, instead // of the underlying object. func (c *interceptingControllerRuntimeCache) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind) (cache.Informer, error) { typeInfo, err := c.typeStore.getTypeInfoForGVK(gvk) if err != nil { return nil, err } return c.getOrCreateInformer(ctx, typeInfo) } // Start runs all the informers known to this cache until the context is closed. // It blocks. func (c *interceptingControllerRuntimeCache) Start(ctx context.Context) error { informers := c.snapshotInformers() for _, informer := range informers { if err := informer.Start(ctx); err != nil { return err } } c.started.Store(true) return nil } // WaitForCacheSync waits for all the caches to sync. Returns false if it could not sync a cache. func (c *interceptingControllerRuntimeCache) WaitForCacheSync(ctx context.Context) bool { informers := c.snapshotInformers() for _, informer := range informers { if !informer.WaitForCacheSync(ctx) { return false } } return true } // snapshotInformers is a helper function that returns a snapshot of the informers. // It is used to avoid race conditions when iterating over the informers. func (c *interceptingControllerRuntimeCache) snapshotInformers() []*streamingInformer { c.mutex.Lock() defer c.mutex.Unlock() return slices.Collect(maps.Values(c.informers)) } // IndexFields adds an index with the given field name on the given object type // by using the given function to extract the value for that field. If you want // compatibility with the Kubernetes API server, only return one key, and only use // fields that the API server supports. Otherwise, you can return multiple keys, // and "equality" in the field selector means that at least one key matches the value. // The FieldIndexer will automatically take care of indexing over namespace // and supporting efficient all-namespace queries. func (c *interceptingControllerRuntimeCache) IndexField(ctx context.Context, obj client.Object, field string, extractValue client.IndexerFunc) error { panic("not implemented") }