ingestor/storage/crds.go (117 lines of code) (raw):

package storage import ( "context" "errors" "fmt" "reflect" adxmonv1 "github.com/Azure/adx-mon/api/v1" "github.com/Azure/adx-mon/pkg/scheduler" meta "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "sigs.k8s.io/controller-runtime/pkg/client" ) type ListFilterFunc func(client.Object) bool // FilterCompleted will filter out objects that are completed, as determined // by their Conditions being in a "True" state and the ObservedGeneration // matching the current generation of the object. func FilterCompleted(obj client.Object) bool { statusObj, ok := obj.(adxmonv1.ConditionedObject) if !ok { return false } condition := statusObj.GetCondition() return condition != nil && condition.Status == metav1.ConditionTrue && condition.ObservedGeneration == obj.GetGeneration() } type CRDHandler interface { List(ctx context.Context, list client.ObjectList, filters ...ListFilterFunc) error UpdateStatus(ctx context.Context, obj client.Object, errStatus error) error } type crdHandler struct { Client client.Client Elector scheduler.Elector } func NewCRDHandler(client client.Client, elector scheduler.Elector) CRDHandler { return &crdHandler{ Client: client, Elector: elector, } } func (c *crdHandler) List(ctx context.Context, list client.ObjectList, filters ...ListFilterFunc) error { if c.Elector != nil && !c.Elector.IsLeader() { return nil } // TODO (jesthom) Method is invoked for each database for each task, we probably want to // switch to some sort of shared cache. // controller-runtime implements such a caching mechanism // // mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{}) // then whenever you retrieve a client, you intherit the caching mechanism // client := mgr.GetClient() if c.Client == nil { return errors.New("no client provided") } if err := c.Client.List(ctx, list); err != nil { return fmt.Errorf("failed to list CRDs: %w", err) } var filtered []runtime.Object err := meta.EachListItem(list, func(item runtime.Object) error { obj, ok := item.(client.Object) if !ok { return nil } for _, filter := range filters { if filter(obj) { return nil } } filtered = append(filtered, obj) return nil }) if err != nil { return fmt.Errorf("failed to filter list items: %w", err) } meta.SetList(list, filtered) return nil } func (c *crdHandler) UpdateStatus(ctx context.Context, obj client.Object, errStatus error) error { if c.Client == nil { return errors.New("no client provided") } statusObj, ok := obj.(adxmonv1.ConditionedObject) if !ok { return errors.New("object does not implement ConditionedObject") } var ( status = metav1.ConditionTrue message = "" ) if errStatus != nil { status = metav1.ConditionFalse message = errStatus.Error() } condition := metav1.Condition{ Status: status, Message: message, } statusObj.SetCondition(condition) if err := c.Client.Status().Update(ctx, obj); err != nil { return fmt.Errorf("failed to update status: %w", err) } return nil } func ConvertToTypedList(objects []client.Object, list client.ObjectList) error { // Get the value of the list object listValue := reflect.ValueOf(list).Elem() // Find the Items field itemsField := listValue.FieldByName("Items") if !itemsField.IsValid() { return fmt.Errorf("list object doesn't have Items field") } // Get the type of the slice elements itemsType := itemsField.Type().Elem() // Create a new slice to hold the converted items newSlice := reflect.MakeSlice(reflect.SliceOf(itemsType), 0, len(objects)) // Convert each object to the right type for _, obj := range objects { // Check if the object can be converted to the target type objValue := reflect.ValueOf(obj) if !objValue.Type().ConvertibleTo(itemsType) { // Try pointer/value conversion if objValue.Type().Elem().ConvertibleTo(itemsType) { // We have *Type but need Type newSlice = reflect.Append(newSlice, objValue.Elem()) } else if objValue.Type().ConvertibleTo(reflect.PointerTo(itemsType)) { // We have Type but need *Type newSlice = reflect.Append(newSlice, objValue.Addr()) } else { return fmt.Errorf("cannot convert %T to slice element type %s", obj, itemsType) } } else { newSlice = reflect.Append(newSlice, objValue) } } // Set the Items field itemsField.Set(newSlice) return nil }