in pkg/controllers/ndbmtd_statefulset_controller.go [64:110]
func (nssc *ndbmtdStatefulSetController) createNodeGroups(sc *SyncContext) syncResult {
// Connect to the Management Server
mgmClient, err := mgmapi.NewMgmClient(sc.ndb.GetConnectstring())
if err != nil {
klog.Errorf("Failed to connect to Management Server : %s", err)
return errorWhileProcessing(err)
}
defer mgmClient.Disconnect()
clusterStatus, err := mgmClient.GetStatus()
if err != nil {
klog.Errorf("Error getting cluster status from management server: %s", err)
return errorWhileProcessing(err)
}
// Get the sorted list of all new nodes whose nodegroup is 65536
newDataNodeIds := clusterStatus.GetConnectedDataNodesWithNodeGroup(mgmapi.NodeGroupNewConnectedDataNode)
if newDataNodeIds == nil {
// All nodes are already inducted into the MySQL Cluster
klog.Info("Nodegroups exist for newly added data nodes")
return continueProcessing()
}
// Induct the new nodes by creating nodegroups one by one
numberOfNodesPerNodeGroup := sc.configSummary.RedundancyLevel
var nodeIds []int
for _, nodeId := range newDataNodeIds {
nodeIds = append(nodeIds, nodeId)
if len(nodeIds) == int(numberOfNodesPerNodeGroup) {
// Create a new nodegroup
ng, err := mgmClient.CreateNodeGroup(nodeIds)
if err != nil {
klog.Errorf("Failed to create nodegroup for nodes %v : %s", nodeIds, err)
return errorWhileProcessing(err)
}
klog.Infof("Created nodegroup '%d' with data nodes %v", ng, nodeIds)
// reset nodeIds for the next iteration
nodeIds = nil
}
}
// Done creating nodegroups
klog.Info("Successfully created nodegroups for newly added data nodes")
return continueProcessing()
}