internal/resource/cache.go (111 lines of code) (raw):
package resource
import (
"context"
"sync"
apiv1 "github.com/Azure/eno/api/v1"
"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/workqueue"
)
type Request struct {
Resource Ref
Composition types.NamespacedName
}
// Cache caches resources indexed and logically grouped by the UUID of the synthesis that produced them.
// Kind of like an informer but optimized for Eno.
type Cache struct {
mut sync.Mutex
queue workqueue.TypedRateLimitingInterface[Request]
syntheses map[string]*tree
synByComp map[types.NamespacedName][]string
}
func (c *Cache) initUnlocked() {
if c.syntheses == nil {
c.syntheses = map[string]*tree{}
}
if c.synByComp == nil {
c.synByComp = map[types.NamespacedName][]string{}
}
if c.queue == nil {
panic("attempted to use resource cache without a queue")
}
}
func (c *Cache) SetQueue(queue workqueue.TypedRateLimitingInterface[Request]) {
c.mut.Lock()
defer c.mut.Unlock()
if c.queue != nil {
panic("attempted to replace queue in resource cache")
}
c.queue = queue
}
func (c *Cache) Get(ctx context.Context, synthesisUUID string, ref Ref) (res *Resource, visible, found bool) {
c.mut.Lock()
defer c.mut.Unlock()
syn, ok := c.syntheses[synthesisUUID]
if !ok {
return nil, false, false
}
return syn.Get(ref)
}
// Visit takes a set of resource slices from the informers and updates the resource status in the cache.
// Return false if the synthesis is not in the cache.
func (c *Cache) Visit(ctx context.Context, comp *apiv1.Composition, synUUID string, items []apiv1.ResourceSlice) bool {
c.mut.Lock()
defer c.mut.Unlock()
c.initUnlocked()
syn, ok := c.syntheses[synUUID]
if !ok {
return false
}
compNSN := types.NamespacedName{Name: comp.Name, Namespace: comp.Namespace}
for _, slice := range items {
for i := 0; i < len(slice.Spec.Resources); i++ {
var state apiv1.ResourceState
if len(slice.Status.Resources) > i {
state = slice.Status.Resources[i]
}
ref := ManifestRef{
Slice: types.NamespacedName{Name: slice.Name, Namespace: slice.Namespace},
Index: i,
}
syn.UpdateState(comp, ref, &state, func(r Ref) {
c.queue.Add(Request{Resource: r, Composition: compNSN})
})
}
}
return true
}
// Fill populates the cache with resources from a synthesis. Call Visit first to see if filling the cache is necessary.
// Get the resource slices from the API - not the informers, which prune out the manifests to save memory.
func (c *Cache) Fill(ctx context.Context, comp types.NamespacedName, synUUID string, items []apiv1.ResourceSlice) {
logger := logr.FromContextOrDiscard(ctx)
var builder treeBuilder
for _, slice := range items {
slice := slice
for i := range slice.Spec.Resources {
res, err := NewResource(ctx, &slice, i)
if err != nil {
// This should be impossible since the synthesis executor process will not produce invalid resources
logger.Error(err, "invalid resource - cannot load into cache", "resourceSliceName", slice.Name, "resourceIndex", i)
return
}
builder.Add(res)
}
}
tree := builder.Build()
c.mut.Lock()
c.initUnlocked()
c.syntheses[synUUID] = tree
c.synByComp[comp] = append(c.synByComp[comp], synUUID)
c.mut.Unlock()
logger.V(1).Info("resource cache filled", "synthesisUUID", synUUID)
}
// Purge removes all syntheses from the cache that are not part of the given composition.
// If comp is nil, all syntheses will be purged.
func (c *Cache) Purge(ctx context.Context, compNSN types.NamespacedName, comp *apiv1.Composition) {
logger := logr.FromContextOrDiscard(ctx)
c.mut.Lock()
defer c.mut.Unlock()
c.initUnlocked()
remainingSyns := []string{}
for _, uuid := range c.synByComp[compNSN] {
if comp != nil && ((comp.Status.CurrentSynthesis != nil && comp.Status.CurrentSynthesis.UUID == uuid) || (comp.Status.PreviousSynthesis != nil && comp.Status.PreviousSynthesis.UUID == uuid)) {
remainingSyns = append(remainingSyns, uuid)
continue // still referenced
}
logger.V(1).Info("resource cache purged", "synthesisUUID", uuid)
delete(c.syntheses, uuid)
}
c.synByComp[compNSN] = remainingSyns
}