func()

in internal/flowcontrol/writebuffer.go [97:143]


func (w *ResourceSliceWriteBuffer) processQueueItem(ctx context.Context) bool {
	item, shutdown := w.queue.Get()
	if shutdown {
		return false
	}
	defer w.queue.Done(item)
	sliceNSN := item.(types.NamespacedName)

	logger := logr.FromContextOrDiscard(ctx).WithValues("resourceSliceName", sliceNSN.Name, "resourceSliceNamespace", sliceNSN.Namespace, "controller", "writeBuffer")
	ctx = logr.NewContext(ctx, logger)

	w.mut.Lock()
	insertionTime := w.insertionTime[sliceNSN]
	updates := w.state[sliceNSN]
	delete(w.state, sliceNSN)

	// Limit the number of operations per patch request
	const max = (10000 / 2) - 2 // 2 ops to initialize status + 2 ops per resource
	if len(updates) > max {
		w.state[sliceNSN] = updates[max:]
		updates = updates[:max]
	}
	w.mut.Unlock()

	// We only forget the rate limit once the update queue for this slice is empty.
	// So the first write is fast, but a steady stream of writes will be throttled exponentially.
	if len(updates) == 0 {
		w.queue.Forget(item)
		w.mut.Lock()
		delete(w.insertionTime, sliceNSN)
		w.mut.Unlock()
		return true // nothing to do
	}

	if w.updateSlice(ctx, insertionTime, sliceNSN, updates) {
		w.queue.AddRateLimited(item)
		return true
	}

	// Put the updates back in the buffer to retry on the next attempt
	w.mut.Lock()
	w.state[sliceNSN] = append(updates, w.state[sliceNSN]...)
	w.mut.Unlock()
	w.queue.AddRateLimited(item)

	return true
}