pkg/controllers/mysqld_statefulset_controller.go (155 lines of code) (raw):

// Copyright (c) 2020, 2023, Oracle and/or its affiliates. // // Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/ package controllers import ( "context" "fmt" "strconv" "github.com/mysql/ndb-operator/pkg/apis/ndbcontroller" "github.com/mysql/ndb-operator/pkg/mysqlclient" "github.com/mysql/ndb-operator/pkg/resources" "github.com/mysql/ndb-operator/pkg/resources/statefulset" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/client-go/kubernetes" listersappsv1 "k8s.io/client-go/listers/apps/v1" listerscorev1 "k8s.io/client-go/listers/core/v1" klog "k8s.io/klog/v2" ) const ( // rootHost is the annotation key which stores the Root user's current host rootHost = ndbcontroller.GroupName + "/root-host" // rootUserGeneration is the annotation key which stores the NdbCluster // generation whose spec has been applied to the Root user. rootUserGeneration = ndbcontroller.GroupName + "/root-user-generation" ) type mysqldStatefulSetController struct { ndbNodeStatefulSetImpl } // NewMySQLDStatefulSetController creates a new mysqldStatefulSetController func newMySQLDStatefulSetController( client kubernetes.Interface, statefulSetLister listersappsv1.StatefulSetLister, configmapLister listerscorev1.ConfigMapLister) *mysqldStatefulSetController { return &mysqldStatefulSetController{ ndbNodeStatefulSetImpl{ client: client, statefulSetLister: statefulSetLister, ndbNodeStatefulset: statefulset.NewMySQLdStatefulSet(configmapLister), }, } } // HandleScaleDown scales down the MySQL Server StatefulSet if it has been requested // in the NdbCluster spec. This method is called before the config version is ensured // in the management and data nodes, i.e., before any new config is applied to the // management and data nodes. This is to ensure that during a scale down, the MySQL // Servers are shutdown before a possible reduction in the number of API sections in // the config. func (mssc *mysqldStatefulSetController) HandleScaleDown(ctx context.Context, sc *SyncContext) syncResult { nc := sc.ndb mysqldSfset := sc.mysqldSfset if mysqldSfset == nil { // Nothing to scale down return continueProcessing() } NdbGeneration := sc.configSummary.NdbClusterGeneration // StatefulSet exists if !sc.isStatefulsetUpdated(sc.mysqldSfset, NdbGeneration, Complete) { // Previous StatefulSet update is not complete yet. // Finish processing. Reconciliation will // continue once the StatefulSet update has been // rolled out. return finishProcessing() } // Handle any scale down mysqldNodeCount := sc.configSummary.NumOfMySQLServers if mysqldSfset.Status.Replicas <= mysqldNodeCount { // No scale down requested or, it has been processed already // Continue processing rest of sync loop return continueProcessing() } // scale down requested if mysqldNodeCount == 0 { // The StatefulSet has to be deleted // Delete the root user first. rootHost := mysqldSfset.GetAnnotations()[rootHost] secretClient := NewMySQLUserPasswordSecretInterface(mssc.client) // Extract ndb operator mysql user password. operatorSecretName := resources.GetMySQLNDBOperatorPasswordSecretName(nc) operatorPassword, err := secretClient.ExtractPassword(ctx, mysqldSfset.Namespace, operatorSecretName) if err != nil { klog.Errorf("Failed to extract ndb operator password from the secret") return errorWhileProcessing(err) } if err := mysqlclient.DeleteRootUserIfExists(mysqldSfset, rootHost, operatorPassword); err != nil { klog.Errorf("Failed to delete root user") return errorWhileProcessing(err) } // Delete the secret. annotations := mysqldSfset.GetAnnotations() secretName := annotations[statefulset.RootPasswordSecret] if secretClient.IsControlledBy(ctx, secretName, nc) { // The given NdbCluster is set as the Owner of the secret, // which implies that this was created by the operator. err := secretClient.Delete(ctx, mysqldSfset.Namespace, secretName) if err != nil && !errors.IsNotFound(err) { // Delete failed with an error. // Ignore NotFound error as this delete might be a redundant // step, caused by an outdated cache read. klog.Errorf("Failed to delete MySQL Root pass secret %q : %s", secretName, err) return errorWhileProcessing(err) } } // scale down to 0 servers, i.e., delete the statefulset if err := mssc.deleteStatefulSet(ctx, mysqldSfset, sc); err != nil { return errorWhileProcessing(err) } // reconciliation will continue once the statefulset has been deleted return finishProcessing() } // create a new statefulset with updated replica to patch the original statefulset // Note : the annotation 'last-applied-config-generation' will be updated only // during ReconcileStatefulset updatedSfset := mysqldSfset.DeepCopy() updatedSfset.Spec.Replicas = &mysqldNodeCount return mssc.patchStatefulSet(ctx, mysqldSfset, updatedSfset) } // ReconcileStatefulSet compares the MySQL Server spec defined in NdbCluster resource // and applies any changes to the statefulset if required. This method is called after // the new config has been ensured in both Management and Data Nodes. func (mssc *mysqldStatefulSetController) ReconcileStatefulSet(ctx context.Context, sc *SyncContext) syncResult { mysqldSfset := sc.mysqldSfset cs := sc.configSummary nc := sc.ndb if mysqldSfset == nil { // statefulset doesn't exist yet if cs.NumOfMySQLServers == 0 { // the current state is in sync with expectation return continueProcessing() } // StatefulSet has to be created // First ensure that a root password secret exists secretClient := NewMySQLUserPasswordSecretInterface(mssc.client) if _, err := secretClient.EnsureMySQLRootPassword(ctx, nc); err != nil { klog.Errorf("Failed to ensure root password secret for StatefulSet %q : %s", mssc.ndbNodeStatefulset.GetName(nc), err) return errorWhileProcessing(err) } // create a statefulset if _, err := mssc.createStatefulSet(ctx, sc); err != nil { return errorWhileProcessing(err) } // StatefulSet was created successfully. // Finish processing. Reconciliation will // continue once the statefulset is updated. return finishProcessing() } // At this point the statefulset exists and has already been verified // to be complete (i.e. no previous updates still being applied) by HandleScaleDown. // Check if the statefulset has the recent config generation. if workloadHasConfigGeneration(mysqldSfset, cs.NdbClusterGeneration) { // Statefulset upto date klog.Info("All MySQL Servers are up-to-date and ready") return continueProcessing() } // Statefulset has to be patched // Patch the Governing Service first if err := sc.serviceController.patchService(ctx, sc, mssc.ndbNodeStatefulset); err != nil { return errorWhileProcessing(err) } // Patch the StatefulSet updatedStatefulSet, err := mssc.ndbNodeStatefulset.NewStatefulSet(cs, nc) if err != nil { return errorWhileProcessing(err) } return mssc.patchStatefulSet(ctx, mysqldSfset, updatedStatefulSet) } // reconcileRootUser creates or updates the root user with the recent NdbCluster spec func (mssc *mysqldStatefulSetController) reconcileRootUser(ctx context.Context, sc *SyncContext) syncResult { mysqldSfset := sc.mysqldSfset if mysqldSfset == nil { // Nothing to do as the MySQL Servers do not exist return continueProcessing() } // Get the last applied NdbCluster Generation to the Root User annotations := mysqldSfset.GetAnnotations() var rootUserGen int64 if genString, exists := annotations[rootUserGeneration]; exists { rootUserGen, _ = strconv.ParseInt(genString, 10, 64) } recentNdbGen := sc.configSummary.NdbClusterGeneration if rootUserGen == recentNdbGen { // The Root user spec is up-to-date return continueProcessing() } // The root user needs be created or updated nc := sc.ndb newRootHost := nc.Spec.MysqlNode.RootHost // Extract ndb operator mysql user password. secretClient := NewMySQLUserPasswordSecretInterface(sc.kubeClientset()) operatorSecretName := resources.GetMySQLNDBOperatorPasswordSecretName(nc) operatorPassword, err := secretClient.ExtractPassword(ctx, mysqldSfset.Namespace, operatorSecretName) if err != nil { return errorWhileProcessing(err) } if existingRootHost, exists := annotations[rootHost]; !exists { // Root user doesn't exist yet - create it. // Extract root user password. secretName, _ := resources.GetMySQLRootPasswordSecretName(nc) rootPassword, err := secretClient.ExtractPassword(ctx, mysqldSfset.Namespace, secretName) if err != nil { return errorWhileProcessing(err) } // Create Root user if err = mysqlclient.CreateRootUserIfNotExist(mysqldSfset, newRootHost, rootPassword, operatorPassword); err != nil { klog.Errorf("Failed to create root user") return errorWhileProcessing(err) } } else if newRootHost != existingRootHost { // Root Host needs to be updated if err := mysqlclient.UpdateRootUser(mysqldSfset, existingRootHost, newRootHost, operatorPassword); err != nil { klog.Errorf("Failed to update root user") return errorWhileProcessing(err) } } // Successfully applied the changes to root user // Patch the StatefulSet to mark the changes as done updatedMysqldSfset := mysqldSfset.DeepCopy() annotations = updatedMysqldSfset.Annotations annotations[rootHost] = newRootHost annotations[rootUserGeneration] = fmt.Sprintf("%d", recentNdbGen) return mssc.patchStatefulSet(ctx, mysqldSfset, updatedMysqldSfset) }