pkg/controllers/ndbmtd_statefulset_controller.go (154 lines of code) (raw):

// Copyright (c) 2022, 2023, Oracle and/or its affiliates. // // Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/ package controllers import ( "context" "errors" "fmt" "github.com/mysql/ndb-operator/pkg/apis/ndbcontroller" "github.com/mysql/ndb-operator/pkg/mgmapi" "github.com/mysql/ndb-operator/pkg/mysqlclient" "github.com/mysql/ndb-operator/pkg/resources" "github.com/mysql/ndb-operator/pkg/resources/statefulset" "k8s.io/client-go/kubernetes" listerappsv1 "k8s.io/client-go/listers/apps/v1" listerscorev1 "k8s.io/client-go/listers/core/v1" klog "k8s.io/klog/v2" ) type ndbmtdStatefulSetController struct { ndbNodeStatefulSetImpl } // newNdbmtdStatefulSetController creates a new ndbmtdStatefulSetController func newNdbmtdStatefulSetController(client kubernetes.Interface, statefulSetLister listerappsv1.StatefulSetLister, secretLister listerscorev1.SecretLister) *ndbmtdStatefulSetController { return &ndbmtdStatefulSetController{ ndbNodeStatefulSetImpl{ client: client, statefulSetLister: statefulSetLister, ndbNodeStatefulset: statefulset.NewNdbmtdStatefulSet(secretLister), }, } } const ( AddNodeOnlineInProgress = ndbcontroller.GroupName + "/add-node-online-in-progress" ) // startNewDataNodes scales up the ndbmtd statefulset to start the new data nodes func (nssc *ndbmtdStatefulSetController) startNewDataNodes(ctx context.Context, sc *SyncContext) syncResult { ndbmtdCount := sc.configSummary.NumOfDataNodes ndbmtdSfset := sc.dataNodeSfSet if *(ndbmtdSfset.Spec.Replicas) == ndbmtdCount { // Statefulset already UpToDate klog.Infof("New data nodes have been started") return continueProcessing() } // Add Data Node Online in progress. // At this point, the config is patched and a rolling restart // of all the existing nodes on the system is complete. Start // the new data nodes by scaling up the data node statefulset. updatedSfset := ndbmtdSfset.DeepCopy() updatedSfset.Spec.Replicas = &ndbmtdCount return nssc.patchStatefulSet(ctx, ndbmtdSfset, updatedSfset) } // createNodeGroups inducts the new data nodes into the MySQL Cluster by creating nodegroups on them. func (nssc *ndbmtdStatefulSetController) createNodeGroups(sc *SyncContext) syncResult { // Connect to the Management Server mgmClient, err := mgmapi.NewMgmClient(sc.ndb.GetConnectstring()) if err != nil { klog.Errorf("Failed to connect to Management Server : %s", err) return errorWhileProcessing(err) } defer mgmClient.Disconnect() clusterStatus, err := mgmClient.GetStatus() if err != nil { klog.Errorf("Error getting cluster status from management server: %s", err) return errorWhileProcessing(err) } // Get the sorted list of all new nodes whose nodegroup is 65536 newDataNodeIds := clusterStatus.GetConnectedDataNodesWithNodeGroup(mgmapi.NodeGroupNewConnectedDataNode) if newDataNodeIds == nil { // All nodes are already inducted into the MySQL Cluster klog.Info("Nodegroups exist for newly added data nodes") return continueProcessing() } // Induct the new nodes by creating nodegroups one by one numberOfNodesPerNodeGroup := sc.configSummary.RedundancyLevel var nodeIds []int for _, nodeId := range newDataNodeIds { nodeIds = append(nodeIds, nodeId) if len(nodeIds) == int(numberOfNodesPerNodeGroup) { // Create a new nodegroup ng, err := mgmClient.CreateNodeGroup(nodeIds) if err != nil { klog.Errorf("Failed to create nodegroup for nodes %v : %s", nodeIds, err) return errorWhileProcessing(err) } klog.Infof("Created nodegroup '%d' with data nodes %v", ng, nodeIds) // reset nodeIds for the next iteration nodeIds = nil } } // Done creating nodegroups klog.Info("Successfully created nodegroups for newly added data nodes") return continueProcessing() } // reorgNdbTables runs the required SQL queries to redistribute NDB // table data across all data nodes including the newly added nodes. func (nssc *ndbmtdStatefulSetController) reorgNdbTables(ctx context.Context, sc *SyncContext) syncResult { // Extract ndb operator mysql user password. nc := sc.ndb operatorSecretName := resources.GetMySQLNDBOperatorPasswordSecretName(nc) operatorPassword, err := NewMySQLUserPasswordSecretInterface(nssc.client).ExtractPassword(ctx, nc.Namespace, operatorSecretName) if err != nil { klog.Errorf("Failed to extract ndb operator password from the secret") return errorWhileProcessing(err) } // Connect to the 0th MySQL Pod to perform reorg partition and optimize mysqlClient, err := mysqlclient.ConnectToStatefulSet(sc.mysqldSfset, "", operatorPassword) if err != nil { return errorWhileProcessing(err) } // Get the list of all NDB tables query := "SELECT TABLE_SCHEMA, TABLE_NAME FROM " + mysqlclient.DbInformationSchema + ".TABLES WHERE ENGINE = 'NDBCLUSTER'" rows, err := mysqlClient.QueryContext(ctx, query) if err != nil { klog.Errorf("Failed to execute query %q : %s", query, err) return errorWhileProcessing(err) } // Run reorg and optimize for all tables, one by one. klog.Infof("Redistributing NDB data among all data nodes, including the new ones") var tableSchema, tableName string for rows.Next() { if err = rows.Scan(&tableSchema, &tableName); err != nil { klog.Errorf("Failed to scan list of NDB tables : %s", err) return errorWhileProcessing(err) } // Check if the table is already reorganized query = "SELECT COUNT(DISTINCT current_primary) " + "FROM ndbinfo.dictionary_tables AS t, ndbinfo.table_fragments AS f " + "WHERE t.table_id = f.table_id AND t.database_name=? AND t.table_name=?" row := mysqlClient.QueryRowContext(ctx, query, tableSchema, tableName) var distributedNodeCount int32 if err = row.Scan(&distributedNodeCount); err != nil { klog.Errorf("Query '%s' failed : %s", query, err) return errorWhileProcessing(err) } tableFullName := fmt.Sprintf("%s.%s", tableSchema, tableName) if distributedNodeCount == sc.configSummary.NumOfDataNodes { // Table already reorganized klog.Infof("Table %q has already been redistributed", tableFullName) continue } // Run ALTER TABLE ... ALGORITHM=INPLACE, REORGANIZE PARTITION query = fmt.Sprintf("ALTER TABLE %s ALGORITHM=INPLACE, REORGANIZE PARTITION", tableFullName) klog.Infof("Running '%s'", query) if _, err = mysqlClient.ExecContext(ctx, query); err != nil { klog.Errorf("Query '%s' failed : %s", query, err) return errorWhileProcessing(err) } // Run OPTIMIZE TABLE query = fmt.Sprintf("OPTIMIZE TABLE %s", tableFullName) klog.Infof("Running '%s'", query) row = mysqlClient.QueryRowContext(ctx, query) var dummy, result, msg string if err = row.Scan(&dummy, &dummy, &result, &msg); err == nil && result == "error" { err = errors.New(msg) } if err != nil { klog.Errorf("Query '%s' failed : %s", query, err) return errorWhileProcessing(err) } } klog.Infof("Successfully redistributed all NDB data") // Done running reorg and optimize table return continueProcessing() } // handleAddNodeOnline scales up the data node statefulset and // then creates nodegroups for the newly started data nodes. func (nssc *ndbmtdStatefulSetController) handleAddNodeOnline(ctx context.Context, sc *SyncContext) syncResult { ndbmtdSfset := sc.dataNodeSfSet if _, exists := ndbmtdSfset.GetAnnotations()[AddNodeOnlineInProgress]; !exists { // Add node online not in progress return continueProcessing() } // Add node online in progress // Scale up the data nodes if sr := nssc.startNewDataNodes(ctx, sc); sr.stopSync() { return sr } // Create node groups if sr := nssc.createNodeGroups(sc); sr.stopSync() { return sr } // Reorg all NDB tables if sr := nssc.reorgNdbTables(ctx, sc); sr.stopSync() { return sr } // Delete AddNodeOnlineInProgress annotation as add node online procedure is now complete updatedSfset := ndbmtdSfset.DeepCopy() delete(updatedSfset.GetAnnotations(), AddNodeOnlineInProgress) return nssc.patchStatefulSet(ctx, ndbmtdSfset, updatedSfset) }