func()

in pkg/controllers/ndbmtd_statefulset_controller.go [114:190]


func (nssc *ndbmtdStatefulSetController) reorgNdbTables(ctx context.Context, sc *SyncContext) syncResult {
	// Extract ndb operator mysql user password.
	nc := sc.ndb
	operatorSecretName := resources.GetMySQLNDBOperatorPasswordSecretName(nc)
	operatorPassword, err := NewMySQLUserPasswordSecretInterface(nssc.client).ExtractPassword(ctx, nc.Namespace, operatorSecretName)
	if err != nil {
		klog.Errorf("Failed to extract ndb operator password from the secret")
		return errorWhileProcessing(err)
	}

	// Connect to the 0th MySQL Pod to perform reorg partition and optimize
	mysqlClient, err := mysqlclient.ConnectToStatefulSet(sc.mysqldSfset, "", operatorPassword)
	if err != nil {
		return errorWhileProcessing(err)
	}

	// Get the list of all NDB tables
	query := "SELECT TABLE_SCHEMA, TABLE_NAME FROM " + mysqlclient.DbInformationSchema + ".TABLES WHERE ENGINE = 'NDBCLUSTER'"
	rows, err := mysqlClient.QueryContext(ctx, query)
	if err != nil {
		klog.Errorf("Failed to execute query %q : %s", query, err)
		return errorWhileProcessing(err)
	}

	// Run reorg and optimize for all tables, one by one.
	klog.Infof("Redistributing NDB data among all data nodes, including the new ones")
	var tableSchema, tableName string
	for rows.Next() {
		if err = rows.Scan(&tableSchema, &tableName); err != nil {
			klog.Errorf("Failed to scan list of NDB tables : %s", err)
			return errorWhileProcessing(err)
		}

		// Check if the table is already reorganized
		query = "SELECT COUNT(DISTINCT current_primary) " +
			"FROM ndbinfo.dictionary_tables AS t, ndbinfo.table_fragments AS f " +
			"WHERE t.table_id = f.table_id AND t.database_name=? AND t.table_name=?"
		row := mysqlClient.QueryRowContext(ctx, query, tableSchema, tableName)
		var distributedNodeCount int32
		if err = row.Scan(&distributedNodeCount); err != nil {
			klog.Errorf("Query '%s' failed : %s", query, err)
			return errorWhileProcessing(err)
		}
		tableFullName := fmt.Sprintf("%s.%s", tableSchema, tableName)
		if distributedNodeCount == sc.configSummary.NumOfDataNodes {
			// Table already reorganized
			klog.Infof("Table %q has already been redistributed", tableFullName)
			continue
		}

		// Run ALTER TABLE ... ALGORITHM=INPLACE, REORGANIZE PARTITION
		query = fmt.Sprintf("ALTER TABLE %s ALGORITHM=INPLACE, REORGANIZE PARTITION", tableFullName)
		klog.Infof("Running '%s'", query)
		if _, err = mysqlClient.ExecContext(ctx, query); err != nil {
			klog.Errorf("Query '%s' failed : %s", query, err)
			return errorWhileProcessing(err)
		}

		// Run OPTIMIZE TABLE
		query = fmt.Sprintf("OPTIMIZE TABLE %s", tableFullName)
		klog.Infof("Running '%s'", query)
		row = mysqlClient.QueryRowContext(ctx, query)
		var dummy, result, msg string
		if err = row.Scan(&dummy, &dummy, &result, &msg); err == nil && result == "error" {
			err = errors.New(msg)
		}

		if err != nil {
			klog.Errorf("Query '%s' failed : %s", query, err)
			return errorWhileProcessing(err)
		}
	}
	klog.Infof("Successfully redistributed all NDB data")

	// Done running reorg and optimize table
	return continueProcessing()
}