pkg/controller/direct/alloydb/cluster_controller.go (541 lines of code) (raw):

// Copyright 2024 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package alloydb import ( "context" "fmt" "strings" krm "github.com/GoogleCloudPlatform/k8s-config-connector/apis/alloydb/v1beta1" refs "github.com/GoogleCloudPlatform/k8s-config-connector/apis/refs/v1beta1" "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/config" "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/direct" "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/direct/common" "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/direct/directbase" "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/direct/registry" gcp "cloud.google.com/go/alloydb/apiv1beta" alloydbpb "cloud.google.com/go/alloydb/apiv1beta/alloydbpb" "github.com/golang/protobuf/ptypes/duration" "google.golang.org/api/option" "google.golang.org/genproto/googleapis/type/dayofweek" "google.golang.org/genproto/googleapis/type/timeofday" "google.golang.org/protobuf/types/known/fieldmaskpb" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/reconcile" ) func init() { registry.RegisterModel(krm.AlloyDBClusterGVK, NewClusterModel) } func NewClusterModel(ctx context.Context, config *config.ControllerConfig) (directbase.Model, error) { return &modelCluster{config: *config}, nil } var _ directbase.Model = &modelCluster{} type modelCluster struct { config config.ControllerConfig } func (m *modelCluster) client(ctx context.Context) (*gcp.AlloyDBAdminClient, error) { var opts []option.ClientOption opts, err := m.config.RESTClientOptions() if err != nil { return nil, err } gcpClient, err := gcp.NewAlloyDBAdminRESTClient(ctx, opts...) if err != nil { return nil, fmt.Errorf("building Cluster client: %w", err) } return gcpClient, err } func (m *modelCluster) MapSecretToResources(ctx context.Context, reader client.Reader, secret corev1.Secret) ([]reconcile.Request, error) { log := klog.FromContext(ctx) log.V(2).Info("mapping secret to AlloyDBClusters", "secret", fmt.Sprintf("%v/%v", secret.GetNamespace(), secret.GetName())) us := &unstructured.UnstructuredList{} us.SetAPIVersion("alloydb.cnrm.cloud.google.com/v1beta1") us.SetKind("AlloyDBCluster") if err := reader.List(ctx, us, &client.ListOptions{Namespace: secret.GetNamespace()}); err != nil { return nil, fmt.Errorf("listing AlloyDBCluster under namespace %v: %w", secret.GetNamespace(), err) } requests := make([]reconcile.Request, 0) for _, cluster := range us.Items { secretName, _, err := unstructured.NestedString(cluster.Object, "spec", "initialUser", "password", "valueFrom", "secretKeyRef", "name") if err != nil { return nil, fmt.Errorf("getting 'spec.initialUser.password.valueFrom.secretKeyRef.name' in unstructured AlloyDBCluster %v/%v: %w", cluster.GetNamespace(), cluster.GetName(), err) } if secretName == secret.GetName() { log.Info("found AlloyDBCluster relying on secret", "name", fmt.Sprintf("%v/%v", cluster.GetNamespace(), cluster.GetName()), "secret", fmt.Sprintf("%v/%v", secret.GetNamespace(), secret.GetName())) requests = append(requests, reconcile.Request{ NamespacedName: types.NamespacedName{ Name: cluster.GetName(), // Reconcile the AlloyDBCluster which referenced the given K8s Secret. Namespace: cluster.GetNamespace(), }, }) } } return requests, nil } func (m *modelCluster) AdapterForObject(ctx context.Context, reader client.Reader, u *unstructured.Unstructured) (directbase.Adapter, error) { obj := &krm.AlloyDBCluster{} if err := runtime.DefaultUnstructuredConverter.FromUnstructured(u.Object, &obj); err != nil { return nil, fmt.Errorf("error converting to %T: %w", obj, err) } id, err := krm.NewClusterIdentity(ctx, reader, obj) if err != nil { return nil, err } // Get alloydb GCP client gcpClient, err := m.client(ctx) if err != nil { return nil, err } return &ClusterAdapter{ id: id, gcpClient: gcpClient, desired: obj, reader: reader, }, nil } func (m *modelCluster) AdapterForURL(ctx context.Context, url string) (directbase.Adapter, error) { // TODO: Support URLs return nil, nil } type ClusterAdapter struct { id *krm.ClusterIdentity gcpClient *gcp.AlloyDBAdminClient desired *krm.AlloyDBCluster actual *alloydbpb.Cluster reader client.Reader } var _ directbase.Adapter = &ClusterAdapter{} // Find retrieves the GCP resource. // Return true means the object is found. This triggers Adapter `Update` call. // Return false means the object is not found. This triggers Adapter `Create` call. // Return a non-nil error requeues the requests. func (a *ClusterAdapter) Find(ctx context.Context) (bool, error) { log := klog.FromContext(ctx) log.V(2).Info("getting Cluster", "name", a.id) req := &alloydbpb.GetClusterRequest{Name: a.id.String()} clusterpb, err := a.gcpClient.GetCluster(ctx, req) if err != nil { if direct.IsNotFound(err) { return false, nil } return false, fmt.Errorf("getting Cluster %q: %w", a.id, err) } a.actual = clusterpb return true, nil } // TODO: Scenario test cases: both networkConfig.networkRef and networkRef set; none set. func (a *ClusterAdapter) resolveNetworkRef(ctx context.Context) error { obj := a.desired if obj.Spec.NetworkRef == nil && obj.Spec.NetworkConfig == nil { return fmt.Errorf("at least one of 'spec.networkRef' " + "and 'spec.networkConfig' should be configured: neither is configured") } if obj.Spec.NetworkRef != nil && obj.Spec.NetworkConfig != nil { return fmt.Errorf("only one of 'spec.networkRef' and " + "'spec.networkConfig' should be configured: both are configured; " + "recommend using 'spec.networkConfig'") } if obj.Spec.NetworkRef != nil { obj.Spec.NetworkConfig = &krm.Cluster_NetworkConfig{ NetworkRef: obj.Spec.NetworkRef, } obj.Spec.NetworkRef = nil } if obj.Spec.NetworkConfig.NetworkRef == nil { return fmt.Errorf("'spec.networkConfig.networkRef' is required when" + "'spec.networkConfig' is configured") } if err := obj.Spec.NetworkConfig.NetworkRef.Normalize(ctx, a.reader, obj); err != nil { return err } return nil } func (a *ClusterAdapter) normalizeReferences(ctx context.Context) error { obj := a.desired if err := a.resolveNetworkRef(ctx); err != nil { return err } if obj.Spec.AutomatedBackupPolicy != nil && obj.Spec.AutomatedBackupPolicy.EncryptionConfig != nil && obj.Spec.AutomatedBackupPolicy.EncryptionConfig.KMSKeyNameRef != nil { key, err := refs.ResolveKMSCryptoKeyRef(ctx, a.reader, obj, obj.Spec.AutomatedBackupPolicy.EncryptionConfig.KMSKeyNameRef) if err != nil { return err } obj.Spec.AutomatedBackupPolicy.EncryptionConfig.KMSKeyNameRef = key } if obj.Spec.ContinuousBackupConfig != nil && obj.Spec.ContinuousBackupConfig.EncryptionConfig != nil && obj.Spec.ContinuousBackupConfig.EncryptionConfig.KMSKeyNameRef != nil { key, err := refs.ResolveKMSCryptoKeyRef(ctx, a.reader, obj, obj.Spec.ContinuousBackupConfig.EncryptionConfig.KMSKeyNameRef) if err != nil { return err } obj.Spec.ContinuousBackupConfig.EncryptionConfig.KMSKeyNameRef = key } if obj.Spec.EncryptionConfig != nil && obj.Spec.EncryptionConfig.KMSKeyNameRef != nil { key, err := refs.ResolveKMSCryptoKeyRef(ctx, a.reader, obj, obj.Spec.EncryptionConfig.KMSKeyNameRef) if err != nil { return err } obj.Spec.EncryptionConfig.KMSKeyNameRef = key } if obj.Spec.RestoreBackupSource != nil && obj.Spec.RestoreBackupSource.BackupNameRef != nil { backup, err := refs.ResolveAlloyDBBackupRef(ctx, a.reader, obj, obj.Spec.RestoreBackupSource.BackupNameRef) if err != nil { return err } obj.Spec.RestoreBackupSource.BackupNameRef = backup } if obj.Spec.RestoreContinuousBackupSource != nil && obj.Spec.RestoreContinuousBackupSource.ClusterRef != nil { external, err := obj.Spec.RestoreContinuousBackupSource.ClusterRef.NormalizedExternal(ctx, a.reader, obj.Namespace) if err != nil { return err } obj.Spec.RestoreContinuousBackupSource.ClusterRef.External = external } if obj.Spec.SecondaryConfig != nil && obj.Spec.SecondaryConfig.PrimaryClusterNameRef != nil { external, err := obj.Spec.SecondaryConfig.PrimaryClusterNameRef.NormalizedExternal(ctx, a.reader, obj.Namespace) if err != nil { return err } obj.Spec.SecondaryConfig.PrimaryClusterNameRef.External = external } return nil } // TODO: Scenario test case: ContinuousBackupConfig.Enabled unset. func (a *ClusterAdapter) resolveKRMDefaultsForCreate() { obj := a.desired if obj.Spec.ClusterType == nil || direct.ValueOf(obj.Spec.ClusterType) == "" { obj.Spec.ClusterType = direct.LazyPtr("PRIMARY") } if obj.Spec.ContinuousBackupConfig != nil && obj.Spec.ContinuousBackupConfig.Enabled == nil { obj.Spec.ContinuousBackupConfig.Enabled = direct.PtrTo(true) } if obj.Spec.DeletionPolicy == nil || direct.ValueOf(obj.Spec.DeletionPolicy) == "" { obj.Spec.DeletionPolicy = direct.LazyPtr("DEFAULT") } } func (a *ClusterAdapter) resolveKRMDefaultsForUpdate() { a.resolveKRMDefaultsForCreate() obj := a.desired // This is needed for only update because the returned actual state has both // fields set to the same value. if obj.Spec.NetworkRef == nil && obj.Spec.NetworkConfig != nil && obj.Spec.NetworkConfig.NetworkRef != nil { obj.Spec.NetworkRef = &refs.ComputeNetworkRef{ External: obj.Spec.NetworkConfig.NetworkRef.External, } } else if (obj.Spec.NetworkConfig == nil || obj.Spec.NetworkConfig.NetworkRef == nil) && obj.Spec.NetworkRef != nil { if obj.Spec.NetworkConfig == nil { obj.Spec.NetworkConfig = &krm.Cluster_NetworkConfig{} } obj.Spec.NetworkConfig.NetworkRef = &refs.ComputeNetworkRef{ External: obj.Spec.NetworkRef.External, } } } // TODO: Scenario test case: Update initialUser.password from `value` to `valueFrom` and vise versa. func (a *ClusterAdapter) resolveInitialUserPasswordField(ctx context.Context) error { obj := a.desired if obj.Spec.InitialUser == nil || obj.Spec.InitialUser.Password == nil { return nil } // Resolve sensitive field 'spec.initialUser.password' when it is set. if err := direct.ResolveSensitiveField(ctx, obj.Spec.InitialUser.Password, "spec.initialUser.password", obj.Namespace, a.reader); err != nil { return err } return nil } // TODO: Test once backup is supported or using scenario: set restoreBackupSource and restoreContinuousBackupSource (either and both). // Create creates the resource in GCP based on `spec` and update the Config Connector object `status` based on the GCP response. func (a *ClusterAdapter) Create(ctx context.Context, createOp *directbase.CreateOperation) error { log := klog.FromContext(ctx) log.V(2).Info("creating Cluster", "name", a.id) mapCtx := &direct.MapContext{} // 1. Resolve reference fields. if err := a.normalizeReferences(ctx); err != nil { return fmt.Errorf("normalizing reference for creation: %w", err) } // 2. Resolve secret field. if err := a.resolveInitialUserPasswordField(ctx); err != nil { return err } // 3. Set default fields that were set by the Terraform library for compatibility. a.resolveKRMDefaultsForCreate() // 4. Validate mutually-exclusive fields. if a.desired.Spec.RestoreBackupSource != nil && a.desired.Spec.RestoreContinuousBackupSource != nil { return fmt.Errorf("only one of 'spec.restoreBackupSource' " + "and 'spec.restoreContinuousBackupSource' can be configured: " + "both are configured") } desired := a.desired.DeepCopy() resource := AlloyDBClusterSpec_ToProto(mapCtx, &desired.Spec) if mapCtx.Err() != nil { return mapCtx.Err() } // 5. Handle labels. resource.Labels = make(map[string]string) for k, v := range a.desired.GetObjectMeta().GetLabels() { resource.Labels[k] = v } resource.Labels["managed-by-cnrm"] = "true" var created *alloydbpb.Cluster if desired.Spec.RestoreBackupSource != nil || desired.Spec.RestoreContinuousBackupSource != nil { req := &alloydbpb.RestoreClusterRequest{ Parent: a.id.Parent().String(), ClusterId: a.id.ID(), Cluster: resource, } if desired.Spec.RestoreBackupSource != nil { backupSource := BackupSource_ToProto(mapCtx, desired.Spec.RestoreBackupSource) if mapCtx.Err() != nil { return mapCtx.Err() } createOp.RecordUpdatingEvent() req.Source = &alloydbpb.RestoreClusterRequest_BackupSource{ BackupSource: backupSource, } op, err := a.gcpClient.RestoreCluster(ctx, req) if err != nil { log.V(2).Info("error creating Cluster based on a backup source", "name", a.id, "error", err) return fmt.Errorf("creating Cluster %s based on a backup source: %w", a.id, err) } created, err = op.Wait(ctx) if err != nil { log.V(2).Info("error waiting for op creating Cluster based on a backup source", "name", a.id, "error", err) return fmt.Errorf("waiting for op creating Cluster %s based on a backup source: %w", a.id, err) } log.V(2).Info("successfully creating Cluster based on a backup source", "name", a.id) } else if desired.Spec.RestoreContinuousBackupSource != nil { continuousBackupSource := ContinuousBackupSource_ToProto(mapCtx, desired.Spec.RestoreContinuousBackupSource) if mapCtx.Err() != nil { return mapCtx.Err() } createOp.RecordUpdatingEvent() req.Source = &alloydbpb.RestoreClusterRequest_ContinuousBackupSource{ ContinuousBackupSource: continuousBackupSource, } op, err := a.gcpClient.RestoreCluster(ctx, req) if err != nil { log.V(2).Info("error creating Cluster based on a source cluster", "name", a.id, "error", err) return fmt.Errorf("creating Cluster %s based on a source cluster: %w", a.id, err) } created, err = op.Wait(ctx) if err != nil { log.V(2).Info("error waiting for op creating Cluster based on a source cluster", "name", a.id, "error", err) return fmt.Errorf("waiting for op creating Cluster %s based on a source cluster: %w", a.id, err) } log.V(2).Info("successfully creating Cluster based on a source cluster", "name", a.id) } return a.updateStatus(ctx, mapCtx, createOp, created) } if resource.ClusterType == alloydbpb.Cluster_SECONDARY { if resource.SecondaryConfig == nil { return fmt.Errorf("cannot create secondary cluster %s without secondaryConfig", a.id) } createOp.RecordUpdatingEvent() req := &alloydbpb.CreateSecondaryClusterRequest{ Parent: a.id.Parent().String(), ClusterId: a.id.ID(), Cluster: resource, } op, err := a.gcpClient.CreateSecondaryCluster(ctx, req) if err != nil { log.V(2).Info("error creating secondary Cluster", "name", a.id, "error", err) return fmt.Errorf("creating secondary Cluster %s: %w", a.id, err) } created, err = op.Wait(ctx) if err != nil { log.V(2).Info("error waiting for secondary Cluster creation op", "name", a.id, "error", err) return fmt.Errorf("secondary Cluster %s waiting creation: %w", a.id, err) } log.V(2).Info("successfully created secondary Cluster", "name", a.id) } else { if resource.SecondaryConfig != nil { return fmt.Errorf("cannot create primary cluster %s with secondaryConfig", a.id) } createOp.RecordUpdatingEvent() req := &alloydbpb.CreateClusterRequest{ Parent: a.id.Parent().String(), ClusterId: a.id.ID(), Cluster: resource, } op, err := a.gcpClient.CreateCluster(ctx, req) if err != nil { log.V(2).Info("error creating primary Cluster", "name", a.id, "error", err) return fmt.Errorf("creating primary Cluster %s: %w", a.id, err) } created, err = op.Wait(ctx) if err != nil { log.V(2).Info("error waiting for primary Cluster creation op", "name", a.id, "error", err) return fmt.Errorf("primary Cluster %s waiting creation: %w", a.id, err) } log.V(2).Info("successfully created Cluster", "name", a.id) } return a.updateStatus(ctx, mapCtx, createOp, created) } func (a *ClusterAdapter) updateStatus(ctx context.Context, mapCtx *direct.MapContext, createOp *directbase.CreateOperation, reconciledCluster *alloydbpb.Cluster) error { status := AlloyDBClusterStatus_FromProto(mapCtx, reconciledCluster) if mapCtx.Err() != nil { return mapCtx.Err() } status.ExternalRef = direct.LazyPtr(a.id.String()) return createOp.UpdateStatus(ctx, status, nil) } func (a *ClusterAdapter) resolveGCPDefaults(desired *alloydbpb.Cluster, actual *alloydbpb.Cluster) { if desired.AutomatedBackupPolicy == nil { desired.AutomatedBackupPolicy = &alloydbpb.AutomatedBackupPolicy{} } if desired.AutomatedBackupPolicy.BackupWindow == nil { desired.AutomatedBackupPolicy.BackupWindow = direct.PtrTo(duration.Duration{Seconds: 3600}) } if desired.AutomatedBackupPolicy.Enabled == nil { desired.AutomatedBackupPolicy.Enabled = direct.PtrTo(false) } if desired.AutomatedBackupPolicy.Location == "" { desired.AutomatedBackupPolicy.Location = a.id.Parent().Location } if desired.AutomatedBackupPolicy.Retention == nil { desired.AutomatedBackupPolicy.Retention = &alloydbpb.AutomatedBackupPolicy_TimeBasedRetention_{ TimeBasedRetention: &alloydbpb.AutomatedBackupPolicy_TimeBasedRetention{ RetentionPeriod: direct.PtrTo((duration.Duration{Seconds: 1209600})), }, } } if desired.AutomatedBackupPolicy.Schedule == nil { desired.AutomatedBackupPolicy.Schedule = &alloydbpb.AutomatedBackupPolicy_WeeklySchedule_{ WeeklySchedule: &alloydbpb.AutomatedBackupPolicy_WeeklySchedule{ DaysOfWeek: []dayofweek.DayOfWeek{ dayofweek.DayOfWeek_MONDAY, dayofweek.DayOfWeek_TUESDAY, dayofweek.DayOfWeek_WEDNESDAY, dayofweek.DayOfWeek_THURSDAY, dayofweek.DayOfWeek_FRIDAY, dayofweek.DayOfWeek_SATURDAY, dayofweek.DayOfWeek_SUNDAY, }, StartTimes: []*timeofday.TimeOfDay{ {Hours: 23}, }, }, } } if desired.ContinuousBackupConfig == nil { desired.ContinuousBackupConfig = &alloydbpb.ContinuousBackupConfig{} } if desired.ContinuousBackupConfig.Enabled == nil { desired.ContinuousBackupConfig.Enabled = direct.PtrTo(true) } if desired.ContinuousBackupConfig.RecoveryWindowDays == 0 { desired.ContinuousBackupConfig.RecoveryWindowDays = 14 } if desired.GeminiConfig == nil { desired.GeminiConfig = &alloydbpb.GeminiClusterConfig{} } if desired.SubscriptionType == alloydbpb.SubscriptionType_SUBSCRIPTION_TYPE_UNSPECIFIED { desired.SubscriptionType = alloydbpb.SubscriptionType_STANDARD } desired.DatabaseVersion = actual.DatabaseVersion desired.Source = actual.Source } // Update updates the resource in GCP based on `spec` and update the Config Connector object `status` based on the GCP response. func (a *ClusterAdapter) Update(ctx context.Context, updateOp *directbase.UpdateOperation) error { log := klog.FromContext(ctx) log.V(2).Info("updating Cluster", "name", a.id) mapCtx := &direct.MapContext{} // TODO: Check immutability for optional and immutable fields. // 1. Resolve reference fields. if err := a.normalizeReferences(ctx); err != nil { return fmt.Errorf("normalizing reference for update: %w", err) } // 2. Resolve secret field. if err := a.resolveInitialUserPasswordField(ctx); err != nil { return err } // 3. Set default fields that were set in the actual state. a.resolveKRMDefaultsForUpdate() // 4. Validate mutually-exclusive fields. if a.desired.Spec.RestoreBackupSource != nil && a.desired.Spec.RestoreContinuousBackupSource != nil { return fmt.Errorf("only one of 'spec.restoreBackupSource' " + "and 'spec.restoreContinuousBackupSource' can be configured: " + "both are configured") } desiredPb := AlloyDBClusterSpec_ToProto(mapCtx, &a.desired.DeepCopy().Spec) if mapCtx.Err() != nil { return mapCtx.Err() } // 5. Handle labels. desiredPb.Labels = make(map[string]string) for k, v := range a.desired.GetObjectMeta().GetLabels() { desiredPb.Labels[k] = v } desiredPb.Labels["managed-by-cnrm"] = "true" // 6. Set resource name. This step is not needed for other operations. desiredPb.Name = a.id.String() // 7. Handle default values for fields not yet supported in KRM types. a.resolveGCPDefaults(desiredPb, a.actual) paths, err := common.CompareProtoMessage(desiredPb, a.actual, common.BasicDiff) if err != nil { return err } // TODO: Figure out how to keep the network immutable. // The returned network value in the actual state is in the format of // "projects/[projectNumber]/networks/[networkID]", but the resolved network // in the desired state is in the format of // "projects/[projectID]/networks/[networkID]". So there is always a diff. // However, network is an immutable field, and always having a diff will // block proper updates or normal re-reconciliation. // To unblock the direct migration, let's drop the network fields // ("network_config.network" and "network") for now. But we need to figure // out the right way to check network immutability (e.g. persist the // applied value under status.observedState). paths.Delete("network_config.network") paths.Delete("network") if len(paths) == 0 { log.V(2).Info("no field needs update", "name", a.id) if *a.desired.Status.ExternalRef == "" { // If it is the first reconciliation after switching to direct controller, // or is an acquisition, then update Status to fill out the ExternalRef // and ObservedState. status := AlloyDBClusterStatus_FromProto(mapCtx, a.actual) if mapCtx.Err() != nil { return mapCtx.Err() } status.ExternalRef = direct.LazyPtr(a.id.String()) return updateOp.UpdateStatus(ctx, status, nil) } return nil } // TODO: Decide if we want to clean up default fields set in desired state. topLevelFieldPaths := sets.New[string]() for path, _ := range paths { tokens := strings.Split(path, ".") topLevelFieldPaths.Insert(tokens[0]) } updateMask := &fieldmaskpb.FieldMask{ Paths: sets.List(topLevelFieldPaths), } updateOp.RecordUpdatingEvent() req := &alloydbpb.UpdateClusterRequest{ UpdateMask: updateMask, Cluster: desiredPb, } op, err := a.gcpClient.UpdateCluster(ctx, req) if err != nil { log.V(2).Info("error updating Cluster", "name", a.id, "error", err) return fmt.Errorf("updating Cluster %s: %w", a.id, err) } updated, err := op.Wait(ctx) if err != nil { log.V(2).Info("error waiting for Cluster update op", "name", a.id, "error", err) return fmt.Errorf("Cluster %s waiting update: %w", a.id.String(), err) } log.V(2).Info("successfully updated Cluster", "name", a.id) status := AlloyDBClusterStatus_FromProto(mapCtx, updated) if mapCtx.Err() != nil { return mapCtx.Err() } if *a.desired.Status.ExternalRef == "" { // If it is the first reconciliation after switching to direct controller, // or is an acquisition with update, then fill out the ExternalRef. status.ExternalRef = direct.LazyPtr(a.id.String()) } return updateOp.UpdateStatus(ctx, status, nil) } // Export maps the GCP object to a Config Connector resource `spec`. func (a *ClusterAdapter) Export(ctx context.Context) (*unstructured.Unstructured, error) { if a.actual == nil { return nil, fmt.Errorf("Find() not called") } u := &unstructured.Unstructured{} obj := &krm.AlloyDBCluster{} mapCtx := &direct.MapContext{} obj.Spec = direct.ValueOf(AlloyDBClusterSpec_FromProto(mapCtx, a.actual)) if mapCtx.Err() != nil { return nil, mapCtx.Err() } obj.Spec.ProjectRef = &refs.ProjectRef{External: a.id.Parent().ProjectID} obj.Spec.Location = direct.PtrTo(a.id.Parent().Location) uObj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj) if err != nil { return nil, err } u.SetName(a.actual.Name) u.SetGroupVersionKind(krm.AlloyDBClusterGVK) u.Object = uObj return u, nil } // TODO: Scenario test case: Delete after the cluster is gone; not forcing delete a secondary cluster. // Delete the resource from GCP service when the corresponding Config Connector resource is deleted. func (a *ClusterAdapter) Delete(ctx context.Context, deleteOp *directbase.DeleteOperation) (bool, error) { log := klog.FromContext(ctx) log.V(2).Info("deleting Cluster", "name", a.id) req := &alloydbpb.DeleteClusterRequest{ Name: a.id.String(), Force: direct.ValueOf(a.desired.Spec.DeletionPolicy) == "FORCE", } op, err := a.gcpClient.DeleteCluster(ctx, req) if err != nil { if direct.IsNotFound(err) { return true, nil } return false, fmt.Errorf("deleting Cluster %s: %w", a.id, err) } log.V(2).Info("successfully deleted Cluster", "name", a.id) err = op.Wait(ctx) if err != nil { return false, fmt.Errorf("waiting delete Cluster %s: %w", a.id, err) } return true, nil }