shardingsphere-operator/pkg/controllers/storage_node_controller.go (706 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package controllers import ( "context" "fmt" "reflect" "strings" "github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/api/v1alpha1" cloudnativepg "github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/kubernetes/cloudnative-pg" "github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/kubernetes/service" "github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/reconcile/storagenode/aws" "github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/shardingsphere" cnpg "github.com/cloudnative-pg/cloudnative-pg/api/v1" cnpgutils "github.com/cloudnative-pg/cloudnative-pg/pkg/utils" dbmeshaws "github.com/database-mesh/golang-sdk/aws" dbmeshawsrds "github.com/database-mesh/golang-sdk/aws/client/rds" "github.com/go-logr/logr" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/record" "k8s.io/utils/strings/slices" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" ) const ( StorageNodeControllerName = "storage-node-controller" FinalizerName = "shardingsphere.apache.org/finalizer" AnnotationKeyRegisterStorageUnitEnabled = "shardingsphere.apache.org/register-storage-unit-enabled" AnnotationKeyComputeNodeName = "shardingsphere.apache.org/compute-node-name" AnnotationKeyLogicDatabaseName = "shardingsphere.apache.org/logic-database-name" ShardingSphereProtocolType = "proxy-frontend-database-protocol-type" ) // StorageNodeReconciler is a controller for storage nodes type StorageNodeReconciler struct { client.Client Scheme *runtime.Scheme Log logr.Logger Recorder record.EventRecorder Service service.Service AwsRegion string AwsAccessKeyID string AwsSecretAccessKey string AwsSessions dbmeshaws.Sessions CNPG cloudnativepg.CloudNativePG } // +kubebuilder:rbac:groups=shardingsphere.apache.org,resources=storagenodes,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=shardingsphere.apache.org,resources=storagenodes/status,verbs=get;update;patch // +kubebuilder:rbac:groups=shardingsphere.apache.org,resources=storagenodes/finalizers,verbs=update // +kubebuilder:rbac:groups=shardingsphere.apache.org,resources=storageproviders,verbs=get;list;watch // +kubebuilder:rbac:groups=postgresql.cnpg.io,resources=clusters,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups="",resources=event,verbs=create;patch // Reconcile handles main function of this controller // nolint:gocognit func (r *StorageNodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { r.Log.Info(fmt.Sprintf("Reconciling StorageNode %s", req.NamespacedName)) // get storage node node := &v1alpha1.StorageNode{} if err := r.Get(ctx, req.NamespacedName, node); err != nil { return ctrl.Result{}, client.IgnoreNotFound(err) } // Get storageProvider with storagenode.Spec.StorageProviderName storageProvider, err := r.getStorageProvider(ctx, node) if err != nil { r.Log.Error(err, fmt.Sprintf("unable to fetch storageProvider %s", node.Spec.StorageProviderName)) return ctrl.Result{Requeue: true}, err } // finalize storage node if node.ObjectMeta.DeletionTimestamp.IsZero() { // The object is not being deleted, so if it does not have our finalizer, // then lets add the finalizer and update the object. This is equivalent to registering our finalizer. if !slices.Contains(node.ObjectMeta.Finalizers, FinalizerName) { node.ObjectMeta.Finalizers = append(node.ObjectMeta.Finalizers, FinalizerName) if err := r.Update(ctx, node); err != nil { return ctrl.Result{}, err } } } else if slices.Contains(node.ObjectMeta.Finalizers, FinalizerName) { return r.finalize(ctx, node, storageProvider) } // reconcile storage node return r.reconcile(ctx, storageProvider, node) } func (r *StorageNodeReconciler) finalizeCloudNativePG(ctx context.Context, node *v1alpha1.StorageNode, storageProvider *v1alpha1.StorageProvider) (ctrl.Result, error) { namespacedName := types.NamespacedName{ Name: node.Name, Namespace: node.Namespace, } cluster, err := r.CNPG.GetClusterByNamespacedName(ctx, namespacedName) if err != nil { return ctrl.Result{Requeue: true}, err } if cluster != nil { if err := r.CNPG.Delete(ctx, cluster); err != nil { return ctrl.Result{Requeue: true}, err } } controllerutil.RemoveFinalizer(node, ChaosFinalizerName) if err := r.Update(ctx, node); err != nil { return ctrl.Result{}, err } return ctrl.Result{RequeueAfter: defaultRequeueTime}, nil } func (r *StorageNodeReconciler) finalize(ctx context.Context, node *v1alpha1.StorageNode, storageProvider *v1alpha1.StorageProvider) (ctrl.Result, error) { var err error var oldStatus = node.Status.DeepCopy() switch node.Status.Phase { case v1alpha1.StorageNodePhaseReady, v1alpha1.StorageNodePhaseNotReady: // set storage node status to deleting node.Status.Phase = v1alpha1.StorageNodePhaseDeleting case v1alpha1.StorageNodePhaseDeleting: break case v1alpha1.StorageNodePhaseDeleteComplete: node.ObjectMeta.Finalizers = slices.Filter([]string{}, node.ObjectMeta.Finalizers, func(f string) bool { return f != FinalizerName }) if err = r.Update(ctx, node); err != nil { r.Log.Error(err, "failed to remove finalizer") } return ctrl.Result{}, nil } if storageProvider.Spec.Provisioner == v1alpha1.ProvisionerCloudNativePG { return r.finalizeCloudNativePG(ctx, node, storageProvider) } // Try to unregister storage unit in shardingsphere. if err = r.unregisterStorageUnit(ctx, node); err != nil { r.Log.Error(err, "failed to delete storage unit") return ctrl.Result{RequeueAfter: defaultRequeueTime}, err } if err = r.deleteDatabaseCluster(ctx, node, storageProvider); err != nil { r.Log.Error(err, "failed to delete database cluster") return ctrl.Result{RequeueAfter: defaultRequeueTime}, err } desiredState := computeDesiredState(node.Status) if !reflect.DeepEqual(oldStatus, desiredState) { node.Status = desiredState err := r.Status().Update(ctx, node) if err != nil { r.Log.Error(err, fmt.Sprintf("unable to update StorageNode %s/%s", node.GetNamespace(), node.GetName())) return ctrl.Result{Requeue: true}, err } } return ctrl.Result{RequeueAfter: defaultRequeueTime}, nil } func (r *StorageNodeReconciler) reconcile(ctx context.Context, storageProvider *v1alpha1.StorageProvider, node *v1alpha1.StorageNode) (ctrl.Result, error) { var err error var oldStatus = node.Status.DeepCopy() // reconcile storage node with storageProvider switch storageProvider.Spec.Provisioner { case v1alpha1.ProvisionerAWSRDSInstance: if err := r.reconcileAwsRdsInstance(ctx, r.getAwsRdsClient(), node, storageProvider); err != nil { r.Recorder.Eventf(node, corev1.EventTypeWarning, "Reconcile Failed", fmt.Sprintf("unable to reconcile AWS RDS Instance %s/%s, err:%s", node.GetNamespace(), node.GetName(), err.Error())) return ctrl.Result{RequeueAfter: defaultRequeueTime}, err } case v1alpha1.ProvisionerAWSRDSCluster: if err := r.reconcileAwsRDSCluster(ctx, r.getAwsRdsClient(), node, storageProvider); err != nil { r.Recorder.Eventf(node, corev1.EventTypeWarning, "Reconcile Failed", fmt.Sprintf("unable to reconcile AWS RDS Cluster %s/%s, err:%s", node.GetNamespace(), node.GetName(), err.Error())) return ctrl.Result{RequeueAfter: defaultRequeueTime}, err } case v1alpha1.ProvisionerAWSAurora: if err := r.reconcileAwsAurora(ctx, r.getAwsRdsClient(), node, storageProvider); err != nil { r.Recorder.Eventf(node, corev1.EventTypeWarning, "Reconcile Failed", fmt.Sprintf("unable to reconcile AWS Aurora %s/%s, err:%s", node.GetNamespace(), node.GetName(), err.Error())) return ctrl.Result{RequeueAfter: defaultRequeueTime}, err } case v1alpha1.ProvisionerCloudNativePG: if err := r.reconcileCloudNativePG(ctx, node, storageProvider); err != nil { r.Recorder.Eventf(node, corev1.EventTypeWarning, "Reconcile Failed", fmt.Sprintf("unable to reconcile CloudNative PG %s/%s, err:%s", node.GetNamespace(), node.GetName(), err.Error())) return ctrl.Result{RequeueAfter: defaultRequeueTime}, err } default: r.Recorder.Event(node, corev1.EventTypeWarning, "UnsupportedDatabaseProvisioner", fmt.Sprintf("unsupported database provisioner %s", storageProvider.Spec.Provisioner)) return ctrl.Result{RequeueAfter: defaultRequeueTime}, err } // register storage unit if needed. if err := r.registerStorageUnit(ctx, node, storageProvider); err != nil { r.Recorder.Eventf(node, corev1.EventTypeWarning, "RegisterStorageUnitFailed", "unable to register storage unit %s/%s", node.GetNamespace(), node.GetName()) return ctrl.Result{Requeue: true}, err } desiredState := computeDesiredState(node.Status) if !reflect.DeepEqual(oldStatus, desiredState) { node.Status = desiredState err := r.Status().Update(ctx, node) if err != nil { r.Log.Error(err, fmt.Sprintf("unable to update StorageNode %s/%s", node.GetNamespace(), node.GetName())) return ctrl.Result{Requeue: true}, err } } return ctrl.Result{RequeueAfter: defaultRequeueTime}, nil } func (r *StorageNodeReconciler) getStorageProvider(ctx context.Context, node *v1alpha1.StorageNode) (storageProvider *v1alpha1.StorageProvider, err error) { if node.Spec.StorageProviderName == "" { r.Recorder.Event(node, corev1.EventTypeWarning, "storageProviderNameIsNil", "storageProviderName is nil") return nil, fmt.Errorf("storageProviderName is nil") } storageProvider = &v1alpha1.StorageProvider{} if err := r.Get(ctx, client.ObjectKey{Name: node.Spec.StorageProviderName}, storageProvider); err != nil { r.Log.Error(err, fmt.Sprintf("unable to fetch storageProvider %s", node.Spec.StorageProviderName)) r.Recorder.Event(node, corev1.EventTypeWarning, "storageProviderNotFound", fmt.Sprintf("storageProvider %s not found", node.Spec.StorageProviderName)) return nil, err } // check provisioner // aws-like provisioner need aws rds client if storageProvider.Spec.Provisioner == v1alpha1.ProvisionerAWSRDSInstance || storageProvider.Spec.Provisioner == v1alpha1.ProvisionerAWSAurora || storageProvider.Spec.Provisioner == v1alpha1.ProvisionerAWSRDSCluster { if r.AwsRegion == "" || r.AwsAccessKeyID == "" || r.AwsSecretAccessKey == "" { r.Recorder.Eventf(node, corev1.EventTypeWarning, "awsCredentialsNotSet", "aws credentials not set") return nil, fmt.Errorf("aws credentials not set") } } return storageProvider, nil } // nolint:gocritic func computeDesiredState(status v1alpha1.StorageNodeStatus) v1alpha1.StorageNodeStatus { // Initialize a new status object based on the current state desiredState := status clusterStatus := status.Cluster.Status if status.Phase == v1alpha1.StorageNodePhaseDeleting { // If the storage node is being deleted, check if all instances are deleted. if clusterStatus == "" && len(status.Instances) == 0 { desiredState.Phase = v1alpha1.StorageNodePhaseDeleteComplete } } else { // If the storage node is not being deleted, check if all instances are ready. if (clusterStatus == "" || clusterStatus == string(dbmeshawsrds.DBClusterStatusAvailable)) && allInstancesReady(status.Instances) { desiredState.Phase = v1alpha1.StorageNodePhaseReady } else { desiredState.Phase = v1alpha1.StorageNodePhaseNotReady } } desiredState.Conditions = computeNewConditions(desiredState, status, clusterStatus) return desiredState } // nolint:gocritic func computeNewConditions(desiredState, status v1alpha1.StorageNodeStatus, clusterStatus string) v1alpha1.StorageNodeConditions { newSNConditions := status.Conditions // Update the cluster ready condition if the cluster status is not empty if clusterStatus != "" { if clusterStatus == string(dbmeshawsrds.DBClusterStatusAvailable) { newSNConditions.UpsertCondition(&v1alpha1.StorageNodeCondition{ Type: v1alpha1.StorageNodeConditionTypeClusterReady, Status: corev1.ConditionTrue, LastUpdateTime: metav1.Now(), Reason: "Cluster is ready", }) } else { newSNConditions.UpsertCondition(&v1alpha1.StorageNodeCondition{ Type: v1alpha1.StorageNodeConditionTypeClusterReady, Status: corev1.ConditionFalse, LastUpdateTime: metav1.Now(), Reason: "Cluster is not ready", }) } } else { newSNConditions.RemoveCondition(v1alpha1.StorageNodeConditionTypeClusterReady) } // Update the available condition based on the phase if desiredState.Phase == v1alpha1.StorageNodePhaseReady { newSNConditions.UpsertCondition(&v1alpha1.StorageNodeCondition{ Type: v1alpha1.StorageNodeConditionTypeAvailable, Status: corev1.ConditionTrue, LastUpdateTime: metav1.Now(), Reason: "All instances are ready", }) } else { newSNConditions.UpsertCondition(&v1alpha1.StorageNodeCondition{ Type: v1alpha1.StorageNodeConditionTypeAvailable, Status: corev1.ConditionFalse, LastUpdateTime: metav1.Now(), Reason: "One or more instances are not ready", }) } // Update the registered condition if status.Registered { newSNConditions.UpsertCondition(&v1alpha1.StorageNodeCondition{ Type: v1alpha1.StorageNodeConditionTypeRegistered, Status: corev1.ConditionTrue, LastUpdateTime: metav1.Now(), Reason: "StorageNode is registered", }) } else { newSNConditions.UpsertCondition(&v1alpha1.StorageNodeCondition{ Type: v1alpha1.StorageNodeConditionTypeRegistered, Status: corev1.ConditionFalse, LastUpdateTime: metav1.Now(), Reason: "StorageNode is not registered", }) } return newSNConditions } // allInstancesReady returns true if all instances are ready, false otherwise func allInstancesReady(instances []v1alpha1.InstanceStatus) bool { if len(instances) == 0 { return false } for idx := range instances { instance := &instances[idx] if !(instance.Status == string(dbmeshawsrds.DBInstanceStatusAvailable)) { return false } } return true } func (r *StorageNodeReconciler) getAwsRdsClient() aws.IRdsClient { if _, ok := r.AwsSessions[r.AwsRegion]; !ok { sessions := dbmeshaws.NewSessions().SetCredential(r.AwsRegion, r.AwsAccessKeyID, r.AwsSecretAccessKey).Build() r.AwsSessions = sessions } return aws.NewRdsClient(dbmeshawsrds.NewService(r.AwsSessions[r.AwsRegion])) } func (r *StorageNodeReconciler) reconcileAwsRdsInstance(ctx context.Context, rdsClient aws.IRdsClient, node *v1alpha1.StorageNode, storageProvider *v1alpha1.StorageProvider) error { instance, err := rdsClient.GetInstance(ctx, node) if err != nil { return err } if instance == nil && node.Status.Phase != v1alpha1.StorageNodePhaseDeleting { err = rdsClient.CreateInstance(ctx, node, storageProvider.Spec.Parameters) if err != nil { return err } instance, err = rdsClient.GetInstance(ctx, node) if err != nil { return err } } if err := updateAWSRDSInstanceStatus(node, instance); err != nil { return fmt.Errorf("updateAWSRDSInstanceStatus failed: %w", err) } return nil } func updateAWSRDSInstanceStatus(node *v1alpha1.StorageNode, instance *dbmeshawsrds.DescInstance) error { instances := make([]v1alpha1.InstanceStatus, 0) if instance == nil { node.Status.Instances = instances return nil } instances = append(instances, v1alpha1.InstanceStatus{ Endpoint: v1alpha1.Endpoint{ Address: instance.Endpoint.Address, Port: instance.Endpoint.Port, }, Status: string(instance.DBInstanceStatus), }) node.Status.Instances = instances return nil } // nolint:dupl func (r *StorageNodeReconciler) reconcileAwsRDSCluster(ctx context.Context, rdsClient aws.IRdsClient, node *v1alpha1.StorageNode, storageProvider *v1alpha1.StorageProvider) error { rc, err := rdsClient.GetRDSCluster(ctx, node) if err != nil { return err } if rc == nil { // create instance err = rdsClient.CreateRDSCluster(ctx, node, storageProvider.Spec.Parameters) if err != nil { return err } rc, err = rdsClient.GetRDSCluster(ctx, node) if err != nil { return err } } // update storage node status if err := updateClusterStatus(ctx, rdsClient, node, rc); err != nil { return fmt.Errorf("updateClusterStatus failed: %w", err) } return nil } // nolint:dupl func (r *StorageNodeReconciler) reconcileAwsAurora(ctx context.Context, rdsClient aws.IRdsClient, node *v1alpha1.StorageNode, storageProvider *v1alpha1.StorageProvider) error { ac, err := rdsClient.GetAuroraCluster(ctx, node) if err != nil { return err } if ac == nil { // create instance err = rdsClient.CreateAuroraCluster(ctx, node, storageProvider.Spec.Parameters) if err != nil { return err } ac, err = rdsClient.GetAuroraCluster(ctx, node) if err != nil { return err } } // update storage node status if err := updateClusterStatus(ctx, rdsClient, node, ac); err != nil { return fmt.Errorf("updateClusterStatus failed: %w", err) } return nil } func updateClusterStatus(ctx context.Context, rdsClient aws.IRdsClient, node *v1alpha1.StorageNode, cluster *dbmeshawsrds.DescCluster) error { // update cluster status clusterStatus := v1alpha1.ClusterStatus{} if cluster != nil { clusterStatus = v1alpha1.ClusterStatus{ Status: cluster.Status, PrimaryEndpoint: v1alpha1.Endpoint{ Address: cluster.PrimaryEndpoint, Port: cluster.Port, }, ReaderEndpoints: []v1alpha1.Endpoint{ { Address: cluster.ReaderEndpoint, Port: cluster.Port, }, }, } } node.Status.Cluster = clusterStatus // update instances status identifier := node.Annotations[v1alpha1.AnnotationsClusterIdentifier] filters := map[string][]string{ "db-cluster-id": {identifier}, } instances, err := rdsClient.GetInstancesByFilters(ctx, filters) if err != nil { return fmt.Errorf("GetInstances failed, err:%w", err) } var instanceStatus []v1alpha1.InstanceStatus for _, instance := range instances { instanceStatus = append(instanceStatus, v1alpha1.InstanceStatus{ Status: string(instance.DBInstanceStatus), Endpoint: v1alpha1.Endpoint{ Address: instance.Endpoint.Address, Port: instance.Endpoint.Port, }}) } node.Status.Instances = instanceStatus return nil } // deleteDatabaseCluster func (r *StorageNodeReconciler) deleteDatabaseCluster(ctx context.Context, node *v1alpha1.StorageNode, storageProvider *v1alpha1.StorageProvider) error { switch storageProvider.Spec.Provisioner { case v1alpha1.ProvisionerAWSRDSInstance: if err := r.deleteAWSRDSInstance(ctx, r.getAwsRdsClient(), node, storageProvider); err != nil { return fmt.Errorf("delete aws rds instance failed: %w", err) } case v1alpha1.ProvisionerAWSRDSCluster: if err := r.deleteAWSRDSCluster(ctx, r.getAwsRdsClient(), node, storageProvider); err != nil { return fmt.Errorf("delete aws rds cluster failed: %w", err) } case v1alpha1.ProvisionerAWSAurora: if err := r.deleteAWSAurora(ctx, r.getAwsRdsClient(), node, storageProvider); err != nil { return fmt.Errorf("delete aws aurora cluster failed: %w", err) } default: return fmt.Errorf("unsupported database provisioner %s", storageProvider.Spec.Provisioner) } return nil } func (r *StorageNodeReconciler) deleteAWSRDSInstance(ctx context.Context, rdsClient aws.IRdsClient, node *v1alpha1.StorageNode, storageProvider *v1alpha1.StorageProvider) error { if node.Annotations[v1alpha1.AnnotationsInstanceIdentifier] == "" { return nil } instance, err := rdsClient.GetInstance(ctx, node) if err != nil { return err } if instance != nil && instance.DBInstanceStatus != dbmeshawsrds.DBInstanceStatusDeleting { if err := rdsClient.DeleteInstance(ctx, node, storageProvider); err != nil { r.Recorder.Eventf(node, corev1.EventTypeWarning, "DeleteFailed", "Failed to delete instance %s: %s", node.Annotations[v1alpha1.AnnotationsInstanceIdentifier], err.Error()) return err } r.Recorder.Event(node, corev1.EventTypeNormal, "Deleting", fmt.Sprintf("instance %s is deleting", node.Annotations[v1alpha1.AnnotationsInstanceIdentifier])) } // update instance status if err := updateAWSRDSInstanceStatus(node, instance); err != nil { return fmt.Errorf("updateAWSRDSInstanceStatus failed: %w", err) } return nil } // nolint:dupl func (r *StorageNodeReconciler) deleteAWSRDSCluster(ctx context.Context, rdsClient aws.IRdsClient, node *v1alpha1.StorageNode, storageProvider *v1alpha1.StorageProvider) error { if node.Annotations[v1alpha1.AnnotationsClusterIdentifier] == "" { return nil } cluster, err := rdsClient.GetRDSCluster(ctx, node) if err != nil { return fmt.Errorf("get rds cluster failed: %w", err) } if cluster != nil && cluster.Status != string(dbmeshawsrds.DBClusterStatusDeleting) { if err := rdsClient.DeleteRDSCluster(ctx, node, storageProvider); err != nil { r.Recorder.Eventf(node, corev1.EventTypeWarning, "DeleteFailed", "Failed to delete rds cluster %s: %s", node.Annotations[v1alpha1.AnnotationsClusterIdentifier], err.Error()) return err } r.Recorder.Event(node, corev1.EventTypeNormal, "Deleting", fmt.Sprintf("rds cluster %s is deleting", node.Annotations[v1alpha1.AnnotationsClusterIdentifier])) } // update storage node status if err := updateClusterStatus(ctx, rdsClient, node, cluster); err != nil { return fmt.Errorf("updateClusterStatus failed: %w", err) } return nil } // nolint:dupl func (r *StorageNodeReconciler) deleteAWSAurora(ctx context.Context, rdsClient aws.IRdsClient, node *v1alpha1.StorageNode, storageProvider *v1alpha1.StorageProvider) error { if node.Annotations[v1alpha1.AnnotationsClusterIdentifier] == "" { return nil } auroraCluster, err := rdsClient.GetAuroraCluster(ctx, node) if err != nil { return fmt.Errorf("get aurora cluster failed: %w", err) } if auroraCluster != nil && auroraCluster.Status != string(dbmeshawsrds.DBClusterStatusDeleting) { if err := rdsClient.DeleteAuroraCluster(ctx, node, storageProvider); err != nil { r.Recorder.Eventf(node, corev1.EventTypeWarning, "DeleteFailed", "Failed to delete aurora cluster %s: %s", node.Annotations[v1alpha1.AnnotationsClusterIdentifier], err.Error()) return err } r.Recorder.Event(node, corev1.EventTypeNormal, "Deleting", fmt.Sprintf("aurora cluster %s is deleting", node.Annotations[v1alpha1.AnnotationsClusterIdentifier])) } // update storage node status if err := updateClusterStatus(ctx, rdsClient, node, auroraCluster); err != nil { return fmt.Errorf("updateClusterStatus failed: %w", err) } return nil } // registerStorageUnit func (r *StorageNodeReconciler) registerStorageUnit(ctx context.Context, node *v1alpha1.StorageNode, storageProvider *v1alpha1.StorageProvider) error { // if register storage unit is not enabled, return if node.Annotations[AnnotationKeyRegisterStorageUnitEnabled] != "true" { return nil } if err := r.validateComputeNodeAnnotations(node); err != nil { return err } // if storage unit is already registered, return if node.Status.Registered { return nil } // if node is not ready, return if node.Status.Phase != v1alpha1.StorageNodePhaseReady { r.Recorder.Eventf(node, corev1.EventTypeWarning, "RegisterWaiting", "Waiting to register storage unit for node %s/%s: node is not ready", node.GetNamespace(), node.GetName()) return nil } logicDBName := node.Annotations[AnnotationKeyLogicDatabaseName] dbName := node.Annotations[v1alpha1.AnnotationsInstanceDBName] ssServer, err := r.getShardingsphereServer(ctx, node) if err != nil { return fmt.Errorf("getShardingsphereServer failed: %w", err) } defer ssServer.Close() if err := ssServer.CreateDatabase(logicDBName); err != nil { return fmt.Errorf("create database failed: %w", err) } r.Recorder.Eventf(node, corev1.EventTypeNormal, "LogicDatabaseCreated", "LogicDatabase %s is created", logicDBName) var host string var port int32 var username, password string // get storage unit info from instance if node.Status.Cluster.Status == "" { host, port, username, password = getDatasourceInfoFromInstance(node, storageProvider) } else { host, port, username, password = getDatasourceInfoFromCluster(node, storageProvider) } if err := ssServer.RegisterStorageUnit(logicDBName, getDSName(node), host, uint(port), dbName, username, password); err != nil { return fmt.Errorf("register storage node failed: %w", err) } r.Recorder.Eventf(node, corev1.EventTypeNormal, "StorageUnitRegistered", "StorageUnit %s:%d/%s is registered", host, port, dbName) node.Status.Registered = true return nil } // getDSName returns the datasource name of the storage node. // datasource name only allows letters, numbers and _, and must start with a letter. // ref: https://shardingsphere.apache.org/document/current/en/user-manual/shardingsphere-proxy/distsql/syntax/rdl/storage-unit-definition/register-storage-unit/ func getDSName(node *v1alpha1.StorageNode) string { return fmt.Sprintf("ds_%s", strings.ReplaceAll(node.GetName(), "-", "_")) } func getDatasourceInfoFromInstance(node *v1alpha1.StorageNode, storageProvider *v1alpha1.StorageProvider) (host string, port int32, username, password string) { ins := node.Status.Instances[0] host = ins.Endpoint.Address port = ins.Endpoint.Port username = node.Annotations[v1alpha1.AnnotationsMasterUsername] if username == "" { username = storageProvider.Spec.Parameters["masterUsername"] } password = node.Annotations[v1alpha1.AnnotationsMasterUserPassword] if password == "" { password = storageProvider.Spec.Parameters["masterUserPassword"] } return } func getDatasourceInfoFromCluster(node *v1alpha1.StorageNode, storageProvider *v1alpha1.StorageProvider) (host string, port int32, username, password string) { cluster := node.Status.Cluster host = cluster.PrimaryEndpoint.Address port = cluster.PrimaryEndpoint.Port username = node.Annotations[v1alpha1.AnnotationsMasterUsername] if username == "" { username = storageProvider.Spec.Parameters["masterUsername"] } password = node.Annotations[v1alpha1.AnnotationsMasterUserPassword] if password == "" { password = storageProvider.Spec.Parameters["masterUserPassword"] } return } func (r *StorageNodeReconciler) unregisterStorageUnit(ctx context.Context, node *v1alpha1.StorageNode) error { if !node.Status.Registered { return nil } if err := r.validateComputeNodeAnnotations(node); err != nil { return err } logicDBName := node.Annotations[AnnotationKeyLogicDatabaseName] ssServer, err := r.getShardingsphereServer(ctx, node) if err != nil { return fmt.Errorf("getShardingsphereServer failed: %w", err) } defer ssServer.Close() if err := ssServer.UnRegisterStorageUnit(logicDBName, getDSName(node)); err != nil { return fmt.Errorf("unregister storage unit failed: %w", err) } r.Recorder.Eventf(node, corev1.EventTypeNormal, "StorageUnitUnRegistered", "StorageUnit of node %s/%s is unregistered", node.GetNamespace(), node.GetName()) node.Status.Registered = false return nil } func (r *StorageNodeReconciler) validateComputeNodeAnnotations(node *v1alpha1.StorageNode) error { requiredAnnos := []string{ AnnotationKeyLogicDatabaseName, v1alpha1.AnnotationsInstanceDBName, AnnotationKeyComputeNodeName, } for _, anno := range requiredAnnos { if v, ok := node.Annotations[anno]; !ok || v == "" { r.Recorder.Eventf(node, corev1.EventTypeWarning, "RegisterChecking", "Waiting to register storage unit for node %s/%s: annotation %s is required", node.GetNamespace(), node.GetName(), anno) return fmt.Errorf("annotation %s is required", anno) } } return nil } func (r *StorageNodeReconciler) getShardingsphereServer(ctx context.Context, node *v1alpha1.StorageNode) (shardingsphere.IServer, error) { var ( driver, host, username, password string port uint ) // get compute node cn := &v1alpha1.ComputeNode{} if err := r.Client.Get(ctx, types.NamespacedName{ Name: node.Annotations[AnnotationKeyComputeNodeName], Namespace: node.Namespace, }, cn); err != nil { return nil, fmt.Errorf("get compute node failed: %w", err) } serverConf := cn.Spec.Bootstrap.ServerConfig driver, ok := serverConf.Props[ShardingSphereProtocolType] if !ok || driver == "" { driver = "mysql" } driver = strings.ToLower(driver) if len(serverConf.Authority.Users) == 0 { return nil, fmt.Errorf("no user in compute node %s/%s", cn.Namespace, cn.Name) } username = strings.Split(serverConf.Authority.Users[0].User, "@")[0] password = serverConf.Authority.Users[0].Password // get service of compute node svc, err := r.Service.GetByNamespacedName(ctx, types.NamespacedName{ Name: node.Annotations[AnnotationKeyComputeNodeName], Namespace: node.Namespace, }) if err != nil || svc == nil { return nil, fmt.Errorf("get service failed: %w", err) } host = fmt.Sprintf("%s.%s", svc.Name, svc.Namespace) port = uint(svc.Spec.Ports[0].Port) ssServer, err := shardingsphere.NewServer(driver, host, port, username, password) if err != nil { return nil, fmt.Errorf("new shardingsphere server failed: %w", err) } return ssServer, nil } func (r *StorageNodeReconciler) reconcileCloudNativePG(ctx context.Context, sn *v1alpha1.StorageNode, sp *v1alpha1.StorageProvider) error { cluster, err := r.getCloudNativePGCluster(ctx, types.NamespacedName{Namespace: sn.Namespace, Name: sn.Name}) if err != nil { return err } if err := updateCloudNativePGCluster(ctx, sn, cluster); err != nil { return err } if cluster != nil { return r.updateCloudNativePGCluster(ctx, sn, sp, cluster) } return r.createCloudNativePGCluster(ctx, sn, sp) } func updateCloudNativePGCluster(ctx context.Context, node *v1alpha1.StorageNode, cluster *cnpg.Cluster) error { if cluster == nil { return nil } var cs string if cluster.Status.Phase == cnpg.PhaseHealthy { node.Status.Phase = v1alpha1.StorageNodePhaseReady cs = "available" } else { node.Status.Phase = v1alpha1.StorageNodePhaseNotReady } node.Status.Cluster = v1alpha1.ClusterStatus{ Status: cs, PrimaryEndpoint: v1alpha1.Endpoint{ Address: cluster.Status.WriteService, Port: 5432, }, ReaderEndpoints: []v1alpha1.Endpoint{ { Address: cluster.Status.ReadService, Port: 5432, }, }, } node.Status.Instances = []v1alpha1.InstanceStatus{} for s, pgins := range cluster.Status.InstancesStatus { var stat string if s == cnpgutils.PodHealthy { stat = "available" } else { stat = string(s) } for _, pg := range pgins { ins := v1alpha1.InstanceStatus{ Status: stat, Endpoint: v1alpha1.Endpoint{ Address: pg, Port: 5432, }, } node.Status.Instances = append(node.Status.Instances, ins) } } return nil } func (r *StorageNodeReconciler) getCloudNativePGCluster(ctx context.Context, namespacedName types.NamespacedName) (*cnpg.Cluster, error) { c, err := r.CNPG.GetClusterByNamespacedName(ctx, namespacedName) if err != nil { return nil, err } return c, nil } func (r *StorageNodeReconciler) createCloudNativePGCluster(ctx context.Context, sn *v1alpha1.StorageNode, sp *v1alpha1.StorageProvider) error { cluster := r.CNPG.Build(ctx, sn, sp) err := r.CNPG.Create(ctx, cluster) if err != nil && apierrors.IsAlreadyExists(err) || err == nil { return nil } return err } func (r *StorageNodeReconciler) updateCloudNativePGCluster(ctx context.Context, sn *v1alpha1.StorageNode, sp *v1alpha1.StorageProvider, cluster *cnpg.Cluster) error { exp := r.CNPG.Build(ctx, sn, sp) exp.ObjectMeta = cluster.ObjectMeta exp.Labels = cluster.Labels exp.Annotations = cluster.Annotations if !reflect.DeepEqual(cluster.Spec, exp.Spec) { return r.CNPG.Update(ctx, exp) } return nil } // SetupWithManager sets up the controller with the Manager func (r *StorageNodeReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&v1alpha1.StorageNode{}). Complete(r) }