in pkg/controllers/ndbmtd_statefulset_controller.go [114:190]
func (nssc *ndbmtdStatefulSetController) reorgNdbTables(ctx context.Context, sc *SyncContext) syncResult {
// Extract ndb operator mysql user password.
nc := sc.ndb
operatorSecretName := resources.GetMySQLNDBOperatorPasswordSecretName(nc)
operatorPassword, err := NewMySQLUserPasswordSecretInterface(nssc.client).ExtractPassword(ctx, nc.Namespace, operatorSecretName)
if err != nil {
klog.Errorf("Failed to extract ndb operator password from the secret")
return errorWhileProcessing(err)
}
// Connect to the 0th MySQL Pod to perform reorg partition and optimize
mysqlClient, err := mysqlclient.ConnectToStatefulSet(sc.mysqldSfset, "", operatorPassword)
if err != nil {
return errorWhileProcessing(err)
}
// Get the list of all NDB tables
query := "SELECT TABLE_SCHEMA, TABLE_NAME FROM " + mysqlclient.DbInformationSchema + ".TABLES WHERE ENGINE = 'NDBCLUSTER'"
rows, err := mysqlClient.QueryContext(ctx, query)
if err != nil {
klog.Errorf("Failed to execute query %q : %s", query, err)
return errorWhileProcessing(err)
}
// Run reorg and optimize for all tables, one by one.
klog.Infof("Redistributing NDB data among all data nodes, including the new ones")
var tableSchema, tableName string
for rows.Next() {
if err = rows.Scan(&tableSchema, &tableName); err != nil {
klog.Errorf("Failed to scan list of NDB tables : %s", err)
return errorWhileProcessing(err)
}
// Check if the table is already reorganized
query = "SELECT COUNT(DISTINCT current_primary) " +
"FROM ndbinfo.dictionary_tables AS t, ndbinfo.table_fragments AS f " +
"WHERE t.table_id = f.table_id AND t.database_name=? AND t.table_name=?"
row := mysqlClient.QueryRowContext(ctx, query, tableSchema, tableName)
var distributedNodeCount int32
if err = row.Scan(&distributedNodeCount); err != nil {
klog.Errorf("Query '%s' failed : %s", query, err)
return errorWhileProcessing(err)
}
tableFullName := fmt.Sprintf("%s.%s", tableSchema, tableName)
if distributedNodeCount == sc.configSummary.NumOfDataNodes {
// Table already reorganized
klog.Infof("Table %q has already been redistributed", tableFullName)
continue
}
// Run ALTER TABLE ... ALGORITHM=INPLACE, REORGANIZE PARTITION
query = fmt.Sprintf("ALTER TABLE %s ALGORITHM=INPLACE, REORGANIZE PARTITION", tableFullName)
klog.Infof("Running '%s'", query)
if _, err = mysqlClient.ExecContext(ctx, query); err != nil {
klog.Errorf("Query '%s' failed : %s", query, err)
return errorWhileProcessing(err)
}
// Run OPTIMIZE TABLE
query = fmt.Sprintf("OPTIMIZE TABLE %s", tableFullName)
klog.Infof("Running '%s'", query)
row = mysqlClient.QueryRowContext(ctx, query)
var dummy, result, msg string
if err = row.Scan(&dummy, &dummy, &result, &msg); err == nil && result == "error" {
err = errors.New(msg)
}
if err != nil {
klog.Errorf("Query '%s' failed : %s", query, err)
return errorWhileProcessing(err)
}
}
klog.Infof("Successfully redistributed all NDB data")
// Done running reorg and optimize table
return continueProcessing()
}