internal/flowcontrol/writebuffer.go (199 lines of code) (raw):

package flowcontrol import ( "context" "encoding/json" "fmt" "sync" "time" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/util/workqueue" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" apiv1 "github.com/Azure/eno/api/v1" "github.com/Azure/eno/internal/resource" "github.com/go-logr/logr" ) type StatusPatchFn func(*apiv1.ResourceState) *apiv1.ResourceState type resourceSliceStatusUpdate struct { SlicedResource *resource.ManifestRef PatchFn StatusPatchFn } // ResourceSliceWriteBuffer reduces load on etcd/apiserver by collecting resource slice status // updates over a short period of time and applying them in a single patch request. type ResourceSliceWriteBuffer struct { client client.Client // queue items are per-slice. // the state map collects multiple updates per slice to be dispatched by next queue item. mut sync.Mutex state map[types.NamespacedName][]*resourceSliceStatusUpdate insertionTime map[types.NamespacedName]time.Time queue workqueue.RateLimitingInterface } func NewResourceSliceWriteBufferForManager(mgr ctrl.Manager) *ResourceSliceWriteBuffer { r := NewResourceSliceWriteBuffer(mgr.GetClient()) mgr.Add(r) return r } func NewResourceSliceWriteBuffer(cli client.Client) *ResourceSliceWriteBuffer { return &ResourceSliceWriteBuffer{ client: cli, state: make(map[types.NamespacedName][]*resourceSliceStatusUpdate), insertionTime: make(map[types.NamespacedName]time.Time), queue: workqueue.NewRateLimitingQueueWithConfig( workqueue.NewItemExponentialFailureRateLimiter(time.Millisecond*100, 8*time.Second), workqueue.RateLimitingQueueConfig{ Name: "writeBuffer", }), } } // PatchStatusAsync returns after enqueueing the given status update. The update will eventually be applied, or dropped only if the slice is deleted. func (w *ResourceSliceWriteBuffer) PatchStatusAsync(ctx context.Context, ref *resource.ManifestRef, patchFn StatusPatchFn) { w.mut.Lock() defer w.mut.Unlock() key := ref.Slice currentSlice := w.state[key] for i, item := range currentSlice { if *item.SlicedResource == *ref { // last write wins currentSlice[i].PatchFn = patchFn return } } if _, found := w.insertionTime[key]; !found { w.insertionTime[key] = time.Now() } w.state[key] = append(currentSlice, &resourceSliceStatusUpdate{ SlicedResource: ref, PatchFn: patchFn, }) if w.queue.NumRequeues(key) == 0 { w.queue.AddRateLimited(key) } } func (w *ResourceSliceWriteBuffer) Start(ctx context.Context) error { go func() { <-ctx.Done() w.queue.ShutDown() }() for w.processQueueItem(ctx) { } return nil } func (w *ResourceSliceWriteBuffer) processQueueItem(ctx context.Context) bool { item, shutdown := w.queue.Get() if shutdown { return false } defer w.queue.Done(item) sliceNSN := item.(types.NamespacedName) logger := logr.FromContextOrDiscard(ctx).WithValues("resourceSliceName", sliceNSN.Name, "resourceSliceNamespace", sliceNSN.Namespace, "controller", "writeBuffer") ctx = logr.NewContext(ctx, logger) w.mut.Lock() insertionTime := w.insertionTime[sliceNSN] updates := w.state[sliceNSN] delete(w.state, sliceNSN) // Limit the number of operations per patch request const max = (10000 / 2) - 2 // 2 ops to initialize status + 2 ops per resource if len(updates) > max { w.state[sliceNSN] = updates[max:] updates = updates[:max] } w.mut.Unlock() // We only forget the rate limit once the update queue for this slice is empty. // So the first write is fast, but a steady stream of writes will be throttled exponentially. if len(updates) == 0 { w.queue.Forget(item) w.mut.Lock() delete(w.insertionTime, sliceNSN) w.mut.Unlock() return true // nothing to do } if w.updateSlice(ctx, insertionTime, sliceNSN, updates) { w.queue.AddRateLimited(item) return true } // Put the updates back in the buffer to retry on the next attempt w.mut.Lock() w.state[sliceNSN] = append(updates, w.state[sliceNSN]...) w.mut.Unlock() w.queue.AddRateLimited(item) return true } func (w *ResourceSliceWriteBuffer) updateSlice(ctx context.Context, insertionTime time.Time, sliceNSN types.NamespacedName, updates []*resourceSliceStatusUpdate) (success bool) { logger := logr.FromContextOrDiscard(ctx) slice := &apiv1.ResourceSlice{} slice.Name = sliceNSN.Name slice.Namespace = sliceNSN.Namespace err := w.client.Get(ctx, client.ObjectKeyFromObject(slice), slice) if client.IgnoreNotFound(err) != nil { logger.Error(err, "unable to get resource slice") return false } patches := w.buildPatch(slice, updates) if len(patches) == 0 { return true // nothing to do! } patchJson, err := json.Marshal(&patches) if err != nil { logger.Error(err, "unable to encode patch") return false } err = w.client.Status().Patch(ctx, slice, client.RawPatch(types.JSONPatchType, patchJson)) if errors.IsNotFound(err) { logger.V(1).Info("resource slice deleted - dropping buffered status updates") return true } if err != nil { logger.Error(err, "unable to update resource slice") return false } logger.V(0).Info(fmt.Sprintf("updated the status of %d resources in slice", len(updates)), "latency", time.Since(insertionTime).Abs().Milliseconds()) sliceStatusUpdates.Inc() return true } func (*ResourceSliceWriteBuffer) buildPatch(slice *apiv1.ResourceSlice, updates []*resourceSliceStatusUpdate) []*jsonPatch { var patches []*jsonPatch unsafeSlice := slice.Status.Resources if len(unsafeSlice) == 0 { patches = append(patches, &jsonPatch{ Op: "add", Path: "/status", Value: map[string]any{}, }, &jsonPatch{ Op: "test", Path: "/status/resources", Value: nil, }, &jsonPatch{ Op: "add", Path: "/status/resources", Value: make([]apiv1.ResourceState, len(slice.Spec.Resources)), }) } for _, update := range updates { if update.SlicedResource.Index > len(slice.Spec.Resources)-1 || update.SlicedResource.Index < 0 { continue // impossible } var unsafeStatusPtr *apiv1.ResourceState if len(unsafeSlice) <= update.SlicedResource.Index { unsafeStatusPtr = &apiv1.ResourceState{} } else { unsafeStatusPtr = &unsafeSlice[update.SlicedResource.Index] } patch := update.PatchFn(unsafeStatusPtr) if patch == nil { continue } path := fmt.Sprintf("/status/resources/%d", update.SlicedResource.Index) patches = append(patches, &jsonPatch{ Op: "test", // make sure the current state is equal to the state we built the patch against Path: path, Value: unsafeStatusPtr, }, &jsonPatch{ Op: "replace", Path: path, Value: patch, }) } return patches } type jsonPatch struct { Op string `json:"op"` Path string `json:"path"` Value any `json:"value"` }