internal/handler/cache.go (196 lines of code) (raw):

package handler import ( "context" "errors" "fmt" "strings" "sync" "time" "github.com/go-logr/logr" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/tools/record" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "github.com/Azure/operation-cache-controller/api/v1alpha1" ctrlutils "github.com/Azure/operation-cache-controller/internal/utils/controller" randutils "github.com/Azure/operation-cache-controller/internal/utils/rand" "github.com/Azure/operation-cache-controller/internal/utils/reconciler" ) //go:generate mockgen -destination=./mocks/mock_cache.go -package=mocks github.com/Azure/operation-cache-controller/internal/handler CacheHandlerInterface type CacheHandlerInterface interface { CheckCacheExpiry(ctx context.Context) (reconciler.OperationResult, error) EnsureCacheInitialized(ctx context.Context) (reconciler.OperationResult, error) CalculateKeepAliveCount(ctx context.Context) (reconciler.OperationResult, error) AdjustCache(ctx context.Context) (reconciler.OperationResult, error) } type CacheHandler struct { cache *v1alpha1.Cache logger logr.Logger client client.Client scheme *runtime.Scheme recorder record.EventRecorder cacheUtils ctrlutils.CacheHelper oputils ctrlutils.OperationHelper setControllerReferenceFunc func(owner, controlled metav1.Object, scheme *runtime.Scheme, opts ...controllerutil.OwnerReferenceOption) error } func NewCacheHandler(ctx context.Context, cache *v1alpha1.Cache, logger logr.Logger, client client.Client, scheme *runtime.Scheme, recorder record.EventRecorder, fn func(owner, controlled metav1.Object, scheme *runtime.Scheme, opts ...controllerutil.OwnerReferenceOption) error) CacheHandlerInterface { return &CacheHandler{ cache: cache, logger: logger, client: client, scheme: scheme, recorder: recorder, setControllerReferenceFunc: fn, } } // updateStatus updates the status of the cache cr func (c *CacheHandler) updateStatus(ctx context.Context) error { if err := c.client.Status().Update(ctx, c.cache); err != nil { return fmt.Errorf("unable to update cache status: %w", err) } return nil } // CheckCacheExpiry checks if the cache cr is expired. If it is, the cr is deleted. func (c *CacheHandler) CheckCacheExpiry(ctx context.Context) (reconciler.OperationResult, error) { if c.cache.Spec.ExpireTime == "" { return reconciler.ContinueProcessing() } ce, err := time.Parse(time.RFC3339, c.cache.Spec.ExpireTime) if err != nil { c.logger.Error(err, "failed to parse expire time") // TODO: set cache expiry condition if needed return reconciler.ContinueProcessing() } if time.Now().After(ce) { c.logger.Info("cache is expired, deleting cache cr") if err := c.client.Delete(ctx, c.cache); err != nil { return reconciler.RequeueWithError(err) } return reconciler.StopProcessing() } return reconciler.ContinueProcessing() } // EnsureCacheInitialized ensures the cache cr is initialized func (c *CacheHandler) EnsureCacheInitialized(ctx context.Context) (reconciler.OperationResult, error) { // initialize the AvailableCaches in status if it is nil if c.cache.Status.AvailableCaches == nil { c.cache.Status.AvailableCaches = []string{} } if c.cache.Status.CacheKey == "" { c.cache.Status.CacheKey = c.cacheUtils.NewCacheKeyFromApplications(c.cache.Spec.OperationTemplate.Applications) } return reconciler.RequeueOnErrorOrContinue(c.updateStatus(ctx)) } // CalculateKeepAliveCount calculates the keepAliveCount for the cache cr func (c *CacheHandler) CalculateKeepAliveCount(ctx context.Context) (reconciler.OperationResult, error) { // before we have cache service to provide the keepAliveCount, we use fixed value c.cache.Status.KeepAliveCount = 5 return reconciler.RequeueOnErrorOrContinue(c.updateStatus(ctx)) } func (c *CacheHandler) createOperationsAsync(ctx context.Context, ops []*v1alpha1.Operation) error { wg := sync.WaitGroup{} errChan := make(chan error, len(ops)) for _, op := range ops { wg.Add(1) go func() { defer wg.Done() errChan <- c.client.Create(ctx, op) }() } wg.Wait() close(errChan) var errs error for err := range errChan { errs = errors.Join(errs, err) } return errs } func (c *CacheHandler) deleteOperationsAsync(ctx context.Context, ops []*v1alpha1.Operation) error { wg := sync.WaitGroup{} errChan := make(chan error, len(ops)) for _, op := range ops { wg.Add(1) go func() { defer wg.Done() errChan <- c.client.Delete(ctx, op) }() } wg.Wait() close(errChan) var errs error for err := range errChan { errs = errors.Join(errs, err) } return errs } func (c *CacheHandler) initOperationFromCache(operationName string) *v1alpha1.Operation { op := &v1alpha1.Operation{} annotations := op.GetAnnotations() if annotations == nil { annotations = map[string]string{} } annotations[ctrlutils.AnnotationNameCacheMode] = ctrlutils.AnnotationValueTrue labels := op.GetLabels() if labels == nil { labels = map[string]string{} } // TODO: set up requirement label instead cacheKeyLabelValue := c.cache.Status.CacheKey if len(c.cache.Status.CacheKey) > 63 { cacheKeyLabelValue = cacheKeyLabelValue[:63] } labels[ctrlutils.LabelNameCacheKey] = cacheKeyLabelValue op.SetAnnotations(annotations) op.SetNamespace(c.cache.Namespace) op.SetName(operationName) op.SetLabels(labels) op.Spec = c.cache.Spec.OperationTemplate return op } func (c *CacheHandler) AdjustCache(ctx context.Context) (reconciler.OperationResult, error) { var ownedOps v1alpha1.OperationList if err := c.client.List(ctx, &ownedOps, client.InNamespace(c.cache.Namespace), client.MatchingFields{v1alpha1.CacheOwnerKey: c.cache.Name}); err != nil { return reconciler.RequeueWithError(err) } availableCaches := []string{} for _, op := range ownedOps.Items { if c.oputils.IsOperationReady(&op) { availableCaches = append(availableCaches, op.Name) } } c.cache.Status.AvailableCaches = availableCaches keepAliveCount := int(c.cache.Status.KeepAliveCount) cacheBalance := len(availableCaches) - keepAliveCount switch { case cacheBalance == 0: // do nothing: should we remove the not available operations? case cacheBalance > 0: // remove all the not available operations and cut available operations down to keepAliveCount availableCacheNumToRemove := cacheBalance opsToRemove := []*v1alpha1.Operation{} for _, op := range ownedOps.Items { if !c.oputils.IsOperationReady(&op) { opsToRemove = append(opsToRemove, &op) } else { if availableCacheNumToRemove > 0 { opsToRemove = append(opsToRemove, &op) availableCacheNumToRemove-- } } } c.logger.Info("removing operations", "operations", opsToRemove) if err := c.deleteOperationsAsync(ctx, opsToRemove); err != nil { return reconciler.RequeueWithError(err) } case cacheBalance < 0: if len(ownedOps.Items) < keepAliveCount { // also count not available operations, create new operations to meet the keepAliveCount opsToCreate := []*v1alpha1.Operation{} opsNumToCreate := keepAliveCount - len(ownedOps.Items) for range opsNumToCreate { opName := fmt.Sprintf("cached-operation-%s-%s", c.cache.Status.CacheKey[:8], strings.ToLower(randutils.GenerateRandomString(5))) opToCreate := c.initOperationFromCache(opName) if err := c.setControllerReferenceFunc(c.cache, opToCreate, c.scheme); err != nil { return reconciler.RequeueWithError(err) } opsToCreate = append(opsToCreate, opToCreate) } c.logger.Info("creating operations", "operations", opsToCreate) if err := c.createOperationsAsync(ctx, opsToCreate); err != nil { return reconciler.RequeueWithError(err) } } // else do nothing: we assume that any not ready operations are in progress and will be ready // we can bring in stuck operations handling if we consider that's one case for cache controller to solve } return reconciler.RequeueOnErrorOrContinue(c.updateStatus(ctx)) }