pkg/controller/direct/asset/savedquery_controller.go (274 lines of code) (raw):

// Copyright 2024 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // +tool:controller // proto.service: google.cloud.asset.v1.AssetService // proto.message: google.cloud.asset.v1.SavedQuery // crd.type: AssetSavedQuery // crd.version: v1alpha1 package asset import ( "context" "fmt" gcp "cloud.google.com/go/asset/apiv1" pb "cloud.google.com/go/asset/apiv1/assetpb" "google.golang.org/api/option" "google.golang.org/protobuf/types/known/fieldmaskpb" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/client" krm "github.com/GoogleCloudPlatform/k8s-config-connector/apis/asset/v1alpha1" refs "github.com/GoogleCloudPlatform/k8s-config-connector/apis/refs/v1beta1" "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/config" "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/direct" "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/direct/common" "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/direct/directbase" "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/direct/registry" ) func init() { registry.RegisterModel(krm.AssetSavedQueryGVK, NewSavedQueryModel) } func NewSavedQueryModel(ctx context.Context, config *config.ControllerConfig) (directbase.Model, error) { return &savedQueryModel{config: *config}, nil } var _ directbase.Model = &savedQueryModel{} type savedQueryModel struct { config config.ControllerConfig } func (m *savedQueryModel) client(ctx context.Context, projectID string) (*gcp.Client, error) { var opts []option.ClientOption config := m.config // Workaround for an unusual behaviour (bug?): // the service requires that a quota project be set if !config.UserProjectOverride || config.BillingProject == "" { config.UserProjectOverride = true config.BillingProject = projectID } opts, err := config.RESTClientOptions() if err != nil { return nil, err } gcpClient, err := gcp.NewRESTClient(ctx, opts...) if err != nil { return nil, fmt.Errorf("building asset saved query client: %w", err) } return gcpClient, err } func (m *savedQueryModel) AdapterForObject(ctx context.Context, reader client.Reader, u *unstructured.Unstructured) (directbase.Adapter, error) { obj := &krm.AssetSavedQuery{} if err := runtime.DefaultUnstructuredConverter.FromUnstructured(u.Object, &obj); err != nil { return nil, fmt.Errorf("error converting to %T: %w", obj, err) } id, err := krm.NewSavedQueryIdentity(ctx, reader, obj) if err != nil { return nil, err } gcpClient, err := m.client(ctx, id.Parent().ProjectID) if err != nil { return nil, err } return &savedQueryAdapter{ gcpClient: gcpClient, id: id, desired: obj, reader: reader, }, nil } func (m *savedQueryModel) AdapterForURL(ctx context.Context, url string) (directbase.Adapter, error) { // TODO: Support URLs return nil, nil } type savedQueryAdapter struct { gcpClient *gcp.Client id *krm.SavedQueryIdentity desired *krm.AssetSavedQuery actual *pb.SavedQuery reader client.Reader } var _ directbase.Adapter = &savedQueryAdapter{} func (a *savedQueryAdapter) Find(ctx context.Context) (bool, error) { log := klog.FromContext(ctx) log.V(2).Info("getting asset saved query", "name", a.id) name := a.id.String() if a.actual != nil { name = a.actual.Name } req := &pb.GetSavedQueryRequest{Name: name} actual, err := a.gcpClient.GetSavedQuery(ctx, req) if err != nil { if direct.IsNotFound(err) { return false, nil } return false, fmt.Errorf("getting asset saved query %q from gcp: %w", a.id.String(), err) } a.actual = actual return true, nil } func (a *savedQueryAdapter) normalizeReferences(ctx context.Context) error { // No references to resolve in SavedQuery spec currently. return nil } func (a *savedQueryAdapter) Create(ctx context.Context, createOp *directbase.CreateOperation) error { log := klog.FromContext(ctx) log.V(2).Info("creating asset saved query", "name", a.id) mapCtx := &direct.MapContext{} if err := a.normalizeReferences(ctx); err != nil { return fmt.Errorf("normalizing references: %w", err) } desired := a.desired.DeepCopy() resource := AssetSavedQuerySpec_ToProto(mapCtx, &desired.Spec) if mapCtx.Err() != nil { return mapCtx.Err() } // resource.Name = a.id.String() // Name is output only for SavedQuery req := &pb.CreateSavedQueryRequest{ Parent: a.id.Parent().String(), SavedQueryId: a.id.ID(), SavedQuery: resource, } // Attempt to create the saved query actual, err := a.gcpClient.CreateSavedQuery(ctx, req) // Regardless of CreateSavedQuery result (success or AlreadyExists), // we need to ensure a.actual is populated by calling Find. found := false var findErr error if err == nil { log.V(2).Info("successfully created asset saved query in gcp, attempting to fetch it", "name", a.id) a.actual = actual found = true } else if errors.IsAlreadyExists(err) { // Resource already exists. Log and attempt to Find to populate a.actual. log.V(2).Info("asset saved query already exists during creation attempt, attempting to fetch it", "name", a.id.String(), "warning", err) found, findErr = a.Find(ctx) if findErr != nil { return fmt.Errorf("fetching existing asset saved query %q after CreateSavedQuery returned AlreadyExists failed: %w", a.id.String(), findErr) } if !found { // This is unexpected if CreateSavedQuery returned AlreadyExists return fmt.Errorf("asset saved query %q not found even though CreateSavedQuery returned AlreadyExists", a.id.String()) } } else { // Other error during creation return fmt.Errorf("creating asset saved query %s: %w", a.id.String(), err) } // Update status status := AssetSavedQueryStatus_FromProto(mapCtx, a.actual) if mapCtx.Err() != nil { return mapCtx.Err() } return createOp.UpdateStatus(ctx, status, nil) } func (a *savedQueryAdapter) Update(ctx context.Context, updateOp *directbase.UpdateOperation) error { log := klog.FromContext(ctx) log.V(2).Info("updating asset saved query", "name", a.id, "actual", a.actual.Name) mapCtx := &direct.MapContext{} if err := a.normalizeReferences(ctx); err != nil { return fmt.Errorf("normalizing references: %w", err) } desired := a.desired.DeepCopy() resource := AssetSavedQuerySpec_ToProto(mapCtx, &desired.Spec) if mapCtx.Err() != nil { return mapCtx.Err() } resource.Name = a.actual.Name // Name must be set for update based on actual state // Construct the protobuf message containing only the fields to update updateProto := &pb.SavedQuery{Name: a.actual.Name} // Create a proto diff and build the update mask diffPathsSet, err := common.CompareProtoMessage(resource, a.actual, common.BasicDiff) if err != nil { return fmt.Errorf("comparing proto messages: %w", err) } // Filter out output-only fields from paths mutablePathsSet := sets.New[string]() for _, p := range diffPathsSet.UnsortedList() { // Iterate over the set's elements // SavedQuery only allows updating 'description', 'labels', 'content' if p == "description" || p == "labels" || p == "content" { mutablePathsSet.Insert(p) // Insert the string path } } if mutablePathsSet.Len() == 0 { // Check the size of the set log.V(2).Info("no mutable fields need update", "name", a.id) // Update status even if no fields changed in spec status := AssetSavedQueryStatus_FromProto(mapCtx, a.actual) if mapCtx.Err() != nil { return mapCtx.Err() } return updateOp.UpdateStatus(ctx, status, nil) } updateMaskPaths := mutablePathsSet.UnsortedList() // Get the slice for the field mask log.V(2).Info("updating asset saved query fields", "paths", updateMaskPaths) // Populate the updateProto with desired values for the paths being updated if mutablePathsSet.Has("description") { // Keep using the set for efficient checks updateProto.Description = resource.Description } if mutablePathsSet.Has("labels") { updateProto.Labels = resource.Labels } if mutablePathsSet.Has("content") { updateProto.Content = resource.Content } updateMask := &fieldmaskpb.FieldMask{Paths: updateMaskPaths} // Use the slice here req := &pb.UpdateSavedQueryRequest{ SavedQuery: updateProto, UpdateMask: updateMask, } actual, err := a.gcpClient.UpdateSavedQuery(ctx, req) if err != nil { return fmt.Errorf("updating asset saved query %s: %w", a.id.String(), err) } a.actual = actual // Update internal actual state with the response log.V(2).Info("successfully updated asset saved query", "name", a.id) status := AssetSavedQueryStatus_FromProto(mapCtx, a.actual) if mapCtx.Err() != nil { return mapCtx.Err() } return updateOp.UpdateStatus(ctx, status, nil) } func (a *savedQueryAdapter) Export(ctx context.Context) (*unstructured.Unstructured, error) { if a.actual == nil { return nil, fmt.Errorf("Find() must be called before Export()") } mapCtx := &direct.MapContext{} obj := &krm.AssetSavedQuery{} obj.Spec = direct.ValueOf(AssetSavedQuerySpec_FromProto(mapCtx, a.actual)) if mapCtx.Err() != nil { return nil, mapCtx.Err() } // Set parent reference from the 'name' field parentRef, id, err := krm.ParseSavedQueryExternal(a.actual.Name) if err != nil { return nil, fmt.Errorf("parsing parent from name %q: %w", a.actual.Name, err) } if parentRef.ProjectID != "" { obj.Spec.Parent.ProjectRef = &refs.ProjectRef{External: parentRef.String()} } else if parentRef.FolderID != "" { obj.Spec.Parent.FolderRef = &refs.FolderRef{External: parentRef.String()} } else if parentRef.OrganizationID != "" { obj.Spec.Parent.OrganizationRef = &refs.OrganizationRef{External: parentRef.String()} } else { return nil, fmt.Errorf("unknown parent type in name %q", a.actual.Name) } // Set observed state status := AssetSavedQueryStatus_FromProto(mapCtx, a.actual) if mapCtx.Err() != nil { return nil, mapCtx.Err() } obj.Status = *status // Convert to unstructured uObj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj) if err != nil { return nil, fmt.Errorf("converting AssetSavedQuery to unstructured: %w", err) } u := &unstructured.Unstructured{Object: uObj} // Set standard Kubernetes metadata u.SetGroupVersionKind(krm.AssetSavedQueryGVK) u.SetName(id) // Use the parsed ID from the name field u.SetNamespace(a.desired.Namespace) // Retain labels and annotations from the original object if needed u.SetLabels(a.desired.Labels) u.SetAnnotations(a.desired.Annotations) // Set external reference annotation if available if status.ExternalRef != nil { annotations := u.GetAnnotations() if annotations == nil { annotations = make(map[string]string) } annotations["cnrm.cloud.google.com/external-ref"] = *status.ExternalRef u.SetAnnotations(annotations) } return u, nil } func (a *savedQueryAdapter) Delete(ctx context.Context, deleteOp *directbase.DeleteOperation) (bool, error) { log := klog.FromContext(ctx) log.V(2).Info("deleting asset saved query", "name", a.id) req := &pb.DeleteSavedQueryRequest{Name: a.id.String()} err := a.gcpClient.DeleteSavedQuery(ctx, req) if err != nil { if direct.IsNotFound(err) { // Return success if not found (assume it was already deleted). log.V(2).Info("skipping delete for non-existent asset saved query, assuming it was already deleted", "name", a.id) return true, nil } return false, fmt.Errorf("deleting asset saved query %s: %w", a.id.String(), err) } log.V(2).Info("successfully deleted asset saved query", "name", a.id) return true, nil } // Helper function to get owner reference. Needed for parent resolution. func ownerFromObject(obj metav1.Object) *types.NamespacedName { return &types.NamespacedName{ Namespace: obj.GetNamespace(), Name: obj.GetName(), } }