pkg/cli/preview/streaming_informer.go (201 lines of code) (raw):

// Copyright 2025 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package preview import ( "context" "encoding/json" "fmt" "sync" "sync/atomic" "time" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/types" toolscache "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" ) // streamingInformer is an informer that streams events from the Kubernetes API server. // It implements cache.Informer, but allows us to intercept / mutate operations. type streamingInformer struct { streamingClient *StreamingClient typeInfo *typeInfo mutex sync.Mutex eventHandlerRegistrations []*eventHandlerRegistration resyncPeriod time.Duration hasSynced atomic.Bool objects objects } // objects is a map of objects by their namespaced name. type objects struct { mutex sync.Mutex store map[types.NamespacedName]Object } // OnListObject is called from list, the lock should be held func (o *objects) OnListObject(obj Object, isInInitialList bool, eventHandlerRegistrations []*eventHandlerRegistration) { id := types.NamespacedName{ Namespace: obj.GetNamespace(), Name: obj.GetName(), } o.store[id] = obj for _, handler := range eventHandlerRegistrations { handler.handler.OnAdd(obj, false) } } // OnWatchAdd is called from watch, the lock is not held func (o *objects) OnWatchAdd(obj Object, eventHandlerRegistrations []*eventHandlerRegistration) { id := types.NamespacedName{ Namespace: obj.GetNamespace(), Name: obj.GetName(), } o.mutex.Lock() defer o.mutex.Unlock() o.store[id] = obj for _, handler := range eventHandlerRegistrations { handler.handler.OnAdd(obj, false) } } var _ cache.Informer = &streamingInformer{} // newStreamingInformer creates a new streaming informer. func newStreamingInformer(streamingClient *StreamingClient, typeInfo *typeInfo) (*streamingInformer, error) { s := &streamingInformer{ streamingClient: streamingClient, typeInfo: typeInfo, } s.objects = objects{ store: make(map[types.NamespacedName]Object), } return s, nil } // Start starts the informer. func (i *streamingInformer) Start(ctx context.Context) error { go i.run(ctx) return nil } // run runs the informer. func (i *streamingInformer) run(ctx context.Context) { log := klog.FromContext(ctx) for { err := i.runOnce(ctx) if err != nil { // If the context is closed, don't log as much if err := ctx.Err(); err != nil { log.V(2).Info("context closed; stopping informer") return } log.Error(err, "running list/watch for informer (will retry)") } // TODO: Backoff time.Sleep(2 * time.Second) } } // runOnce runs the informer once. func (i *streamingInformer) runOnce(ctx context.Context) error { listMetadata, err := i.doList(ctx) if err != nil { return err } i.hasSynced.Store(true) watchListener := &watchListener{objects: &i.objects, eventHandlerRegistrations: i.eventHandlerRegistrations} watchOptions := WatchOptions{ ResourceVersion: listMetadata.ResourceVersion, AllowWatchBookmarks: true, } if err := i.streamingClient.Watch(ctx, i.typeInfo, watchOptions, watchListener); err != nil { return err } return fmt.Errorf("watch finished unexpectedly") } // doList lists the objects for the given type. func (i *streamingInformer) doList(ctx context.Context) (*ListMetadata, error) { i.objects.mutex.Lock() defer i.objects.mutex.Unlock() isInInitialList := !i.hasSynced.Load() listListener := &listListener{objects: &i.objects, isInInitialList: isInInitialList, eventHandlerRegistrations: i.eventHandlerRegistrations} if err := i.streamingClient.List(ctx, i.typeInfo, listListener); err != nil { return nil, err } return &listListener.metadata, nil } // listListener is a listener for list operations. type listListener struct { isInInitialList bool eventHandlerRegistrations []*eventHandlerRegistration objects *objects metadata ListMetadata } // OnListBegin is called when the list operation begins. func (i *listListener) OnListBegin(metadata ListMetadata) { i.metadata = metadata } // OnListObject is called when an object is listed. func (i *listListener) OnListObject(obj Object) error { i.objects.OnListObject(obj, i.isInInitialList, i.eventHandlerRegistrations) return nil } // OnListEnd is called when the list operation ends. func (i *listListener) OnListEnd() { } type watchListener struct { objects *objects eventHandlerRegistrations []*eventHandlerRegistration } // OnWatchEvent is called when a watch event occurs. func (i *watchListener) OnWatchEvent(eventType string, obj Object) error { switch eventType { case "ADDED": i.objects.OnWatchAdd(obj, i.eventHandlerRegistrations) return nil case "BOOKMARK": klog.Infof("BOOKMARK %+v", obj) return nil default: return fmt.Errorf("unknown event type: %q", eventType) } } // AddEventHandler adds an event handler to the shared informer using the shared informer's resync // period. Events to a single handler are delivered sequentially, but there is no coordination // between different handlers. // It returns a registration handle for the handler that can be used to remove // the handler again. func (i *streamingInformer) AddEventHandler(handler toolscache.ResourceEventHandler) (toolscache.ResourceEventHandlerRegistration, error) { return i.AddEventHandlerWithResyncPeriod(handler, i.resyncPeriod) } // AddEventHandlerWithResyncPeriod adds an event handler to the shared informer using the // specified resync period. Events to a single handler are delivered sequentially, but there is // no coordination between different handlers. // It returns a registration handle for the handler that can be used to remove // the handler again and an error if the handler cannot be added. func (i *streamingInformer) AddEventHandlerWithResyncPeriod(handler toolscache.ResourceEventHandler, resyncPeriod time.Duration) (toolscache.ResourceEventHandlerRegistration, error) { i.mutex.Lock() defer i.mutex.Unlock() registration := &eventHandlerRegistration{handler: handler, resyncPeriod: resyncPeriod} // TODO: Propagate correctly to ListWatch i.eventHandlerRegistrations = append(i.eventHandlerRegistrations, registration) if i.hasSynced.Load() { // TODO: Snapshot for _, obj := range i.objects.store { registration.handler.OnAdd(obj, true) } } return registration, nil } // RemoveEventHandler removes a formerly added event handler given by // its registration handle. // This function is guaranteed to be idempotent, and thread-safe. func (i *streamingInformer) RemoveEventHandler(handle toolscache.ResourceEventHandlerRegistration) error { panic("not implemented") } // AddIndexers adds more indexers to this store. If you call this after you already have data // in the store, the results are undefined. func (i *streamingInformer) AddIndexers(indexers toolscache.Indexers) error { panic("not implemented") } // HasSynced return true if the informers underlying store has synced. func (i *streamingInformer) HasSynced() bool { return i.hasSynced.Load() } // WaitForCacheSync waits for all the caches to sync. Returns false if it could not sync a cache. func (i *streamingInformer) WaitForCacheSync(ctx context.Context) bool { timer := time.NewTicker(50 * time.Millisecond) defer timer.Stop() for { if i.HasSynced() { return true } // Check for timer or ctx.Done() select { case <-timer.C: // Check again case <-ctx.Done(): return false } } } type eventHandlerRegistration struct { informer *streamingInformer handler toolscache.ResourceEventHandler resyncPeriod time.Duration } // HasSynced reports if both the parent has synced and all pre-sync // events have been delivered. func (i *eventHandlerRegistration) HasSynced() bool { return i.informer.HasSynced() } // Get retrieves an obj for the given object key from the Kubernetes Cluster. // obj must be a struct pointer so that obj can be updated with the response // returned by the Server. func (i *streamingInformer) Get(ctx context.Context, key client.ObjectKey, obj client.Object, opts ...client.GetOption) error { if len(opts) != 0 { return fmt.Errorf("options not implemented: %v", opts) } i.objects.mutex.Lock() defer i.objects.mutex.Unlock() existing, ok := i.objects.store[key] if !ok { return apierrors.NewNotFound(i.typeInfo.GroupResource(), key.String()) } // TODO: How do we want to copy objects? b, err := json.Marshal(existing) if err != nil { return fmt.Errorf("error copying %T: %w", obj, err) } if err := json.Unmarshal(b, obj); err != nil { return fmt.Errorf("error copying %T: %w", obj, err) } return nil }