in pkg/controllers/sync_context.go [188:271]
func (sc *SyncContext) ensureDataNodePodVersion(ctx context.Context) syncResult {
ndbmtdSfset := sc.dataNodeSfSet
if statefulsetUpdateComplete(ndbmtdSfset) {
// All data nodes have the desired pod version.
// Continue with rest of the sync process.
klog.Info("All Data node pods are up-to-date and ready")
return continueProcessing()
}
desiredPodRevisionHash := ndbmtdSfset.Status.UpdateRevision
klog.Infof("Ensuring Data Node pods have the desired podSpec version, %s", desiredPodRevisionHash)
// Get the node and nodegroup details via clusterStatus
mgmClient, err := mgmapi.NewMgmClient(sc.ndb.GetConnectstring())
if err != nil {
return errorWhileProcessing(err)
}
defer mgmClient.Disconnect()
clusterStatus, err := mgmClient.GetStatus()
if err != nil {
klog.Errorf("Error getting cluster status from management server: %s", err)
return errorWhileProcessing(err)
}
// Group the nodes based on nodegroup.
// The node ids are sorted within the sub arrays and the array
// itself is sorted based on the node groups. Every sub array
// will have RedundancyLevel number of node ids.
nodesGroupedByNodegroups := clusterStatus.GetNodesGroupedByNodegroup()
if nodesGroupedByNodegroups == nil {
err := fmt.Errorf("internal error: could not extract nodes and node groups from cluster status")
return errorWhileProcessing(err)
}
// Pick up the i'th node id from every sub array of
// nodesGroupedByNodegroups during every iteration and
// ensure that they all have the latest Pod definition.
redundancyLevel := sc.configSummary.RedundancyLevel
for i := 0; i < int(redundancyLevel); i++ {
// Note down the node ids to be ensured
candidateNodeIds := make([]int, 0, redundancyLevel)
for _, nodesInNodegroup := range nodesGroupedByNodegroups {
candidateNodeIds = append(candidateNodeIds, nodesInNodegroup[i])
}
// Check the pods running MySQL Cluster nodes with candidateNodeIds
// and delete them if they have an older pod definition.
var nodesBeingUpdated []int
for _, nodeId := range candidateNodeIds {
// Generate the pod name using nodeId.
// Data node with nodeId 'i' runs in a pod with ordinal index 'i-1-numberOfMgmdNodes'
ndbmtdPodName := fmt.Sprintf(
"%s-%d", ndbmtdSfset.Name, nodeId-1-int(sc.configSummary.NumOfManagementNodes))
// Check the pod version and delete it if its outdated
podDeleted, err := sc.ensurePodVersion(
ctx, ndbmtdSfset.Namespace, ndbmtdPodName, desiredPodRevisionHash,
fmt.Sprintf("Data Node(nodeId=%d)", nodeId))
if err != nil {
return errorWhileProcessing(err)
}
if podDeleted {
nodesBeingUpdated = append(nodesBeingUpdated, nodeId)
}
}
if len(nodesBeingUpdated) > 0 {
// The outdated data nodes are being updated.
// Exit here and allow them to be restarted by the statefulset controllers.
// Continue syncing once they are up, in a later reconciliation loop.
klog.Infof("The data nodes %v, identified with old pod version, are being restarted", nodesBeingUpdated)
// Stop processing. Reconciliation will continue
// once the StatefulSet is fully ready again.
return finishProcessing()
}
klog.Infof("The data nodes %v have the desired pod version %s", candidateNodeIds, desiredPodRevisionHash)
}
// Control will never reach here but to make compiler happy return continue.
return continueProcessing()
}