pkg/controller/direct/alloydb/instance_controller.go (359 lines of code) (raw):

// Copyright 2024 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package alloydb import ( "context" "fmt" "reflect" "strings" krm "github.com/GoogleCloudPlatform/k8s-config-connector/apis/alloydb/v1beta1" refsv1beta1 "github.com/GoogleCloudPlatform/k8s-config-connector/apis/refs/v1beta1" "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/config" "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/direct" "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/direct/directbase" "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/direct/registry" gcp "cloud.google.com/go/alloydb/apiv1beta" alloydbpb "cloud.google.com/go/alloydb/apiv1beta/alloydbpb" "github.com/google/go-cmp/cmp" "google.golang.org/api/option" "google.golang.org/protobuf/types/known/fieldmaskpb" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/client" ) func init() { registry.RegisterModel(krm.AlloyDBInstanceGVK, NewInstanceModel) } func NewInstanceModel(ctx context.Context, config *config.ControllerConfig) (directbase.Model, error) { return &instanceModel{config: *config}, nil } var _ directbase.Model = &instanceModel{} type instanceModel struct { config config.ControllerConfig } func (m *instanceModel) client(ctx context.Context) (*gcp.AlloyDBAdminClient, error) { var opts []option.ClientOption opts, err := m.config.RESTClientOptions() if err != nil { return nil, err } gcpClient, err := gcp.NewAlloyDBAdminRESTClient(ctx, opts...) if err != nil { return nil, fmt.Errorf("error building AlloyDB client for Instance: %w", err) } return gcpClient, err } func resolveInstanceType(ctx context.Context, reader client.Reader, obj *krm.AlloyDBInstance, isDeletion bool) error { if obj.Spec.InstanceType == nil && obj.Spec.InstanceTypeRef == nil { return fmt.Errorf("at least one of 'spec.InstanceTypeRef' " + "and 'spec.InstanceType' should be configured: neither is configured") } var instanceType *string if obj.Spec.InstanceType != nil { instanceType = obj.Spec.InstanceType } if obj.Spec.InstanceTypeRef != nil { plainTextInstanceType := instanceType var err error instanceType, err = refsv1beta1.ResolveAlloyDBClusterType(ctx, reader, obj, obj.Spec.InstanceTypeRef) if err != nil { if !isDeletion { // TODO: Read instance type from observed state because it's necessary during deletion. return fmt.Errorf("cannot resolve `spec.InstanceTypeRef`: %w", err) } } if plainTextInstanceType != nil && *plainTextInstanceType != *instanceType { return fmt.Errorf("'spec.InstanceTypeRef' and 'spec.InstanceType' "+ "resolve into different values: spec.InstanceTypeRef resolves to "+ "instanceType %q, spec.InstanceType is %q: they must be the same", *instanceType, *plainTextInstanceType) } } if *instanceType == "" { return fmt.Errorf("instance type should be a non-empty string, " + "ensure the referenced AlloyDBCluster in `spec.instancerTypeRef` " + "has the correct cluster type or `spec.instancerTypeRef.external` " + "has a correct instance type") } if obj.Spec.InstanceTypeRef == nil { obj.Spec.InstanceTypeRef = &refsv1beta1.AlloyDBClusterTypeRef{} } obj.Spec.InstanceTypeRef.External = *instanceType return nil } func (m *instanceModel) AdapterForObject(ctx context.Context, reader client.Reader, u *unstructured.Unstructured) (directbase.Adapter, error) { obj := &krm.AlloyDBInstance{} if err := runtime.DefaultUnstructuredConverter.FromUnstructured(u.Object, &obj); err != nil { return nil, fmt.Errorf("error converting to %T: %w", obj, err) } id, err := krm.NewInstanceIdentity(ctx, reader, obj) if err != nil { return nil, err } // Get alloydb GCP client gcpClient, err := m.client(ctx) if err != nil { return nil, err } return &instanceAdapter{ id: id, gcpClient: gcpClient, reader: reader, desired: obj, }, nil } func (m *instanceModel) AdapterForURL(ctx context.Context, url string) (directbase.Adapter, error) { // TODO: Support URLs return nil, nil } type instanceAdapter struct { id *krm.InstanceIdentity gcpClient *gcp.AlloyDBAdminClient reader client.Reader desired *krm.AlloyDBInstance actual *alloydbpb.Instance } var _ directbase.Adapter = &instanceAdapter{} // Find retrieves the GCP resource. // Return true means the object is found. This triggers Adapter `Update` call. // Return false means the object is not found. This triggers Adapter `Create` call. // Return a non-nil error requeues the requests. func (a *instanceAdapter) Find(ctx context.Context) (bool, error) { log := klog.FromContext(ctx) log.V(2).Info("getting instance", "name", a.id) req := &alloydbpb.GetInstanceRequest{Name: a.id.String()} instancepb, err := a.gcpClient.GetInstance(ctx, req) if err != nil { log.V(2).Info("error getting instance", "name", a.id, "error", err) if direct.IsNotFound(err) { return false, nil } return false, fmt.Errorf("getting instance %q: %w", a.id, err) } a.actual = instancepb return true, nil } // Create creates the resource in GCP based on `spec` and update the Config Connector object `status` based on the GCP response. func (a *instanceAdapter) Create(ctx context.Context, createOp *directbase.CreateOperation) error { log := klog.FromContext(ctx) log.V(2).Info("creating instance", "name", a.id) mapCtx := &direct.MapContext{} if err := resolveInstanceType(ctx, a.reader, a.desired, false); err != nil { return err } desired := a.desired.DeepCopy() resource := AlloyDBInstanceSpec_ToProto(mapCtx, &desired.Spec) if mapCtx.Err() != nil { return mapCtx.Err() } resource.Labels = make(map[string]string) for k, v := range a.desired.GetObjectMeta().GetLabels() { resource.Labels[k] = v } resource.Labels["managed-by-cnrm"] = "true" var created *alloydbpb.Instance instanceType := a.desired.Spec.InstanceTypeRef.External if instanceType == "SECONDARY" { req := &alloydbpb.CreateSecondaryInstanceRequest{ Parent: a.id.Parent().String(), InstanceId: a.id.ID(), Instance: resource, } op, err := a.gcpClient.CreateSecondaryInstance(ctx, req) if err != nil { log.V(2).Info("error creating secondary instance", "name", a.id, "error", err) return fmt.Errorf("creating secondary instance %s: %w", a.id, err) } created, err = op.Wait(ctx) if err != nil { log.V(2).Info("error waiting secondary instance creation", "name", a.id, "error", err) return fmt.Errorf("secondary instance %s waiting creation: %w", a.id, err) } log.V(2).Info("successfully created secondary instance", "name", a.id) } else { req := &alloydbpb.CreateInstanceRequest{ Parent: a.id.Parent().String(), InstanceId: a.id.ID(), Instance: resource, } op, err := a.gcpClient.CreateInstance(ctx, req) if err != nil { log.V(2).Info("error creating instance", "name", a.id, "error", err) return fmt.Errorf("creating instance %s: %w", a.id, err) } created, err = op.Wait(ctx) if err != nil { log.V(2).Info("error waiting instance creation", "name", a.id, "error", err) return fmt.Errorf("instance %s waiting creation: %w", a.id, err) } log.V(2).Info("successfully created instance", "name", a.id) } status := AlloyDBInstanceStatus_FromProto(mapCtx, created) if mapCtx.Err() != nil { return mapCtx.Err() } status.ExternalRef = direct.LazyPtr(a.id.String()) return createOp.UpdateStatus(ctx, status, nil) } // Update updates the resource in GCP based on `spec` and update the Config Connector object `status` based on the GCP response. func (a *instanceAdapter) Update(ctx context.Context, updateOp *directbase.UpdateOperation) error { log := klog.FromContext(ctx) log.V(2).Info("updating instance", "name", a.id) mapCtx := &direct.MapContext{} if err := resolveInstanceType(ctx, a.reader, a.desired, false); err != nil { return err } parsedActual := AlloyDBInstanceSpec_FromProto(mapCtx, a.actual) if mapCtx.Err() != nil { return mapCtx.Err() } // TODO: Change to CompareProtoMessage once we support all the files in the instance pb. updatePaths, err := compareInstance(ctx, parsedActual, &a.desired.Spec) if err != nil { return err } desiredLabels := a.desired.GetObjectMeta().GetLabels() if desiredLabels == nil { desiredLabels = make(map[string]string) } desiredLabels["managed-by-cnrm"] = "true" if !reflect.DeepEqual(a.actual.GetLabels(), desiredLabels) { log.V(2).Info("'metadata.labels' field is updated (-old +new)", cmp.Diff(a.actual.GetLabels(), desiredLabels)) updatePaths = append(updatePaths, "labels") } if len(updatePaths) == 0 { log.V(2).Info("no field needs update", "name", a.id) if *a.desired.Status.ExternalRef == "" { // If it is the first reconciliation after switching to direct controller, // or is an acquisition, then update Status to fill out the ExternalRef // and ObservedState. status := AlloyDBInstanceStatus_FromProto(mapCtx, a.actual) if mapCtx.Err() != nil { return mapCtx.Err() } status.ExternalRef = direct.LazyPtr(a.id.String()) return updateOp.UpdateStatus(ctx, status, nil) } return nil } updateMask := &fieldmaskpb.FieldMask{ Paths: updatePaths, } desiredPb := AlloyDBInstanceSpec_ToProto(mapCtx, &a.desired.DeepCopy().Spec) desiredPb.Labels = desiredLabels desiredPb.Name = a.id.String() req := &alloydbpb.UpdateInstanceRequest{ UpdateMask: updateMask, Instance: desiredPb, } op, err := a.gcpClient.UpdateInstance(ctx, req) if err != nil { log.V(2).Info("error updating instance", "name", a.id, "error", err) return fmt.Errorf("updating instance %s: %w", a.id, err) } updated, err := op.Wait(ctx) if err != nil { log.V(2).Info("error waiting instance update", "name", a.id, "error", err) return fmt.Errorf("instance %s waiting update: %w", a.id, err) } log.V(2).Info("successfully updated instance", "name", a.id) status := AlloyDBInstanceStatus_FromProto(mapCtx, updated) if mapCtx.Err() != nil { return mapCtx.Err() } if *a.desired.Status.ExternalRef == "" { // If it is the first reconciliation after switching to direct controller, // or is an acquisition with updates, then fill out the ExternalRef. status.ExternalRef = direct.LazyPtr(a.id.String()) } return updateOp.UpdateStatus(ctx, status, nil) } func compareInstance(ctx context.Context, actual, desired *krm.AlloyDBInstanceSpec) (updatePaths []string, err error) { log := klog.FromContext(ctx) updatePaths = make([]string, 0) if !reflect.DeepEqual(actual.Annotations, desired.Annotations) { log.V(2).Info("'spec.annotations' field is updated (-old +new)", cmp.Diff(actual.Annotations, desired.Annotations)) updatePaths = append(updatePaths, "annotations") } // TODO: Test case with availability type unset. if desired.AvailabilityType != nil && !reflect.DeepEqual(actual.AvailabilityType, desired.AvailabilityType) { log.V(2).Info("'spec.availabilityType' field is updated (-old +new)", cmp.Diff(actual.AvailabilityType, desired.AvailabilityType)) updatePaths = append(updatePaths, "availability_type") } // TODO: Test "copied" behavior for read pool // TODO: Test "overridden" behavior for read pool // Default value of databaseFlags is unknown for a read instance unless we // make API calls to get the database flags of the primary instance. if desired.DatabaseFlags != nil && !reflect.DeepEqual(actual.DatabaseFlags, desired.DatabaseFlags) { log.V(2).Info("'spec.databaseFlags' field is updated (-old +new)", cmp.Diff(actual.DatabaseFlags, desired.DatabaseFlags)) updatePaths = append(updatePaths, "database_flags") } if desired.DisplayName != nil && !reflect.DeepEqual(actual.DisplayName, desired.DisplayName) { log.V(2).Info("'spec.displayName' field is updated (-old +new)", cmp.Diff(actual.DisplayName, desired.DisplayName)) updatePaths = append(updatePaths, "display_name") } if desired.GCEZone != nil && !reflect.DeepEqual(actual.GCEZone, desired.GCEZone) { log.V(2).Info("'spec.gceZone' field is updated (-old +new)", cmp.Diff(actual.GCEZone, desired.GCEZone)) updatePaths = append(updatePaths, "gce_zone") } if desired.InstanceTypeRef != nil && !reflect.DeepEqual(actual.InstanceTypeRef.External, desired.InstanceTypeRef.External) { log.V(2).Info("'spec.instanceTypeRef' field is updated (-old +new)", cmp.Diff(actual.InstanceTypeRef.External, desired.InstanceTypeRef.External)) return nil, fmt.Errorf("cannot change immutable field %s from %v to %v", "'spec.instanceTypeRef'", actual.InstanceTypeRef.External, desired.InstanceTypeRef.External) } // TODO: Test machineConfig unset and empty struct if desired.MachineConfig != nil { if desired.MachineConfig.CPUCount != nil && !reflect.DeepEqual(actual.MachineConfig.CPUCount, desired.MachineConfig.CPUCount) { log.V(2).Info("'spec.machineConfig.cpuCount' field is updated (-old +new)", cmp.Diff(actual.MachineConfig.CPUCount, desired.MachineConfig.CPUCount)) updatePaths = append(updatePaths, "machine_config.cpu_count") } } if desired.NetworkConfig != nil { if desired.NetworkConfig.EnablePublicIP != nil && !reflect.DeepEqual(actual.NetworkConfig.EnablePublicIP, desired.NetworkConfig.EnablePublicIP) { log.V(2).Info("'spec.networkConfig.enablePublicIp' field is updated (-old +new)", cmp.Diff(actual.NetworkConfig.EnablePublicIP, desired.NetworkConfig.EnablePublicIP)) updatePaths = append(updatePaths, "network_config.enable_public_ip") } if desired.NetworkConfig.EnableOutboundPublicIP != nil && !reflect.DeepEqual(actual.NetworkConfig.EnableOutboundPublicIP, desired.NetworkConfig.EnableOutboundPublicIP) { log.V(2).Info("'spec.networkConfig.enableOutboundPublicIp' field is updated (-old +new)", cmp.Diff(actual.NetworkConfig.EnableOutboundPublicIP, desired.NetworkConfig.EnableOutboundPublicIP)) updatePaths = append(updatePaths, "network_config.enable_outbound_public_ip") } if desired.NetworkConfig.AuthorizedExternalNetworks != nil && !reflect.DeepEqual(actual.NetworkConfig.AuthorizedExternalNetworks, desired.NetworkConfig.AuthorizedExternalNetworks) { log.V(2).Info("'spec.networkConfig.authorizedExternalNetworks' field is updated (-old +new)", cmp.Diff(actual.NetworkConfig.AuthorizedExternalNetworks, desired.NetworkConfig.AuthorizedExternalNetworks)) updatePaths = append(updatePaths, "network_config.authorized_external_networks") } } if desired.ReadPoolConfig != nil { if desired.ReadPoolConfig.NodeCount != nil && !reflect.DeepEqual(actual.ReadPoolConfig.NodeCount, desired.ReadPoolConfig.NodeCount) { log.V(2).Info("'spec.readPoolConfig.nodeCount' field is updated (-old +new)", cmp.Diff(actual.ReadPoolConfig.NodeCount, desired.ReadPoolConfig.NodeCount)) updatePaths = append(updatePaths, "read_pool_config.node_count") } } return updatePaths, nil } // Export maps the GCP object to a Config Connector resource `spec`. func (a *instanceAdapter) Export(ctx context.Context) (*unstructured.Unstructured, error) { if a.actual == nil { return nil, fmt.Errorf("Find() not called") } u := &unstructured.Unstructured{} obj := &krm.AlloyDBInstance{} mapCtx := &direct.MapContext{} obj.Spec = direct.ValueOf(AlloyDBInstanceSpec_FromProto(mapCtx, a.actual)) if mapCtx.Err() != nil { return nil, mapCtx.Err() } uObj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj) if err != nil { return nil, err } // Split name into tokens and use ID. u.SetName(a.actual.Name) u.SetGroupVersionKind(krm.AlloyDBInstanceGVK) u.Object = uObj return u, nil } // Delete the resource from GCP service when the corresponding Config Connector resource is deleted. func (a *instanceAdapter) Delete(ctx context.Context, deleteOp *directbase.DeleteOperation) (bool, error) { log := klog.FromContext(ctx) log.V(2).Info("deleting instance", "name", a.id) // instanceType must be resolved before calling DELETE. if err := resolveInstanceType(ctx, a.reader, a.desired, true); err != nil { return false, err } // Returning true directly if it is to delete a secondary instance. // This is because deletion of secondary instance is not supported. Instead, // users should delete the secondary cluster which will forcefully delete // the associated secondary instance. instanceType := a.desired.Spec.InstanceTypeRef.External if instanceType == "SECONDARY" { log.V(2).Info("This operation didn't delete the secondary instance. You need to delete the associated secondary cluster to delete the secondary instance (and the entire secondary cluster).", "name", a.id) return true, nil } req := &alloydbpb.DeleteInstanceRequest{Name: a.id.String()} op, err := a.gcpClient.DeleteInstance(ctx, req) if err != nil { log.V(2).Info("error deleting instance", "name", a.id, "error", err) if direct.IsNotFound(err) { return false, nil } return false, fmt.Errorf("deleting instance %s: %w", a.id, err) } err = op.Wait(ctx) if err != nil { // "(line 15:3): missing \"value\" field" is likely a protolib error but // not a real server side error. When it happens, it usually means the // deletion has been completed successfully but the returned empty // struct can't be parsed by the protolib. if !strings.Contains(err.Error(), "(line 15:3): missing \"value\" field") { log.V(2).Info("error waiting instance delete", "name", a.id, "error", err) return false, fmt.Errorf("waiting delete instance %s: %w", a.id, err) } } log.V(2).Info("successfully deleted instance", "name", a.id) return true, nil }