in internal/flowcontrol/writebuffer.go [97:143]
func (w *ResourceSliceWriteBuffer) processQueueItem(ctx context.Context) bool {
item, shutdown := w.queue.Get()
if shutdown {
return false
}
defer w.queue.Done(item)
sliceNSN := item.(types.NamespacedName)
logger := logr.FromContextOrDiscard(ctx).WithValues("resourceSliceName", sliceNSN.Name, "resourceSliceNamespace", sliceNSN.Namespace, "controller", "writeBuffer")
ctx = logr.NewContext(ctx, logger)
w.mut.Lock()
insertionTime := w.insertionTime[sliceNSN]
updates := w.state[sliceNSN]
delete(w.state, sliceNSN)
// Limit the number of operations per patch request
const max = (10000 / 2) - 2 // 2 ops to initialize status + 2 ops per resource
if len(updates) > max {
w.state[sliceNSN] = updates[max:]
updates = updates[:max]
}
w.mut.Unlock()
// We only forget the rate limit once the update queue for this slice is empty.
// So the first write is fast, but a steady stream of writes will be throttled exponentially.
if len(updates) == 0 {
w.queue.Forget(item)
w.mut.Lock()
delete(w.insertionTime, sliceNSN)
w.mut.Unlock()
return true // nothing to do
}
if w.updateSlice(ctx, insertionTime, sliceNSN, updates) {
w.queue.AddRateLimited(item)
return true
}
// Put the updates back in the buffer to retry on the next attempt
w.mut.Lock()
w.state[sliceNSN] = append(updates, w.state[sliceNSN]...)
w.mut.Unlock()
w.queue.AddRateLimited(item)
return true
}