ingestor/storage/kql_functions.go (111 lines of code) (raw):

package storage import ( "context" "errors" "fmt" "github.com/Azure/adx-mon/pkg/logger" "github.com/Azure/adx-mon/pkg/scheduler" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" adxmonv1 "github.com/Azure/adx-mon/api/v1" ) const ( // name of our custom finalizer FinalizerName = "function.adx-mon.azure.com/finalizer" ) type Functions interface { UpdateStatus(ctx context.Context, fn *adxmonv1.Function) error Update(ctx context.Context, fn *adxmonv1.Function) error List(ctx context.Context) ([]*adxmonv1.Function, error) } type functions struct { Client client.Client Elector scheduler.Elector } func NewFunctions(client client.Client, elector scheduler.Elector) *functions { return &functions{ Client: client, Elector: elector, } } func (f *functions) Update(ctx context.Context, fn *adxmonv1.Function) error { if f.Client == nil { return errors.New("no client provided") } if err := f.Client.Update(ctx, fn); err != nil { logger.Errorf("Failed to update function %s: %v", fn.Name, err) return err } return nil } func (f *functions) UpdateStatus(ctx context.Context, fn *adxmonv1.Function) error { if f.Client == nil { return errors.New("no client provided") } if fn.Status.Status == adxmonv1.Success { fn.Status.ObservedGeneration = fn.GetGeneration() fn.Status.Error = "" if !fn.DeletionTimestamp.IsZero() && controllerutil.ContainsFinalizer(fn, FinalizerName) { // remove our finalizer from the list and update it. controllerutil.RemoveFinalizer(fn, FinalizerName) if err := f.Client.Update(ctx, fn); err != nil { logger.Errorf("Failed to remove finalizer from function %s: %v", fn.Name, err) fn.Status.Status = adxmonv1.Failed } else { return nil } } } fn.Status.LastTimeReconciled = metav1.Now() return f.Client.Status().Update(ctx, fn) } func (f *functions) List(ctx context.Context) ([]*adxmonv1.Function, error) { if f.Client == nil { return nil, fmt.Errorf("no client provided") } if f.Elector != nil && !f.Elector.IsLeader() { return nil, nil } list := &adxmonv1.FunctionList{} if err := f.Client.List(ctx, list); err != nil { if errors.Is(err, &meta.NoKindMatchError{}) || errors.Is(err, &meta.NoResourceMatchError{}) { return nil, nil } return nil, fmt.Errorf("failed to list functions: %w", err) } var fns []*adxmonv1.Function for _, fn := range list.Items { if fn.Spec.Suspend != nil && *fn.Spec.Suspend { // Skip suspended functions continue } if !fn.GetDeletionTimestamp().IsZero() { fn.Status.Reason = "Function deleted" } else { switch fn.GetGeneration() { case fn.Status.ObservedGeneration: // Skip functions that are up to date continue case 1: fn.Status.Reason = "Function created" default: fn.Status.Reason = "Function updated" } if err := f.ensureFinalizer(ctx, &fn); err != nil { logger.Errorf("Failed to ensure finalizer for function %s: %v", fn.Name, err) } } fns = append(fns, &fn) } return fns, nil } func (f *functions) ensureFinalizer(ctx context.Context, fn *adxmonv1.Function) error { if f.Client == nil { return fmt.Errorf("no client provided") } // examine DeletionTimestamp to determine if object is under deletion if fn.ObjectMeta.DeletionTimestamp.IsZero() { // The object is not being deleted, so if it does not have our finalizer, // then lets add the finalizer and update the object. This is equivalent // to registering our finalizer. if !controllerutil.ContainsFinalizer(fn, FinalizerName) { controllerutil.AddFinalizer(fn, FinalizerName) return f.Client.Update(ctx, fn) } } return nil }