func()

in pkg/controllers/sync_context.go [188:271]


func (sc *SyncContext) ensureDataNodePodVersion(ctx context.Context) syncResult {
	ndbmtdSfset := sc.dataNodeSfSet
	if statefulsetUpdateComplete(ndbmtdSfset) {
		// All data nodes have the desired pod version.
		// Continue with rest of the sync process.
		klog.Info("All Data node pods are up-to-date and ready")
		return continueProcessing()
	}

	desiredPodRevisionHash := ndbmtdSfset.Status.UpdateRevision
	klog.Infof("Ensuring Data Node pods have the desired podSpec version, %s", desiredPodRevisionHash)

	// Get the node and nodegroup details via clusterStatus
	mgmClient, err := mgmapi.NewMgmClient(sc.ndb.GetConnectstring())
	if err != nil {
		return errorWhileProcessing(err)
	}
	defer mgmClient.Disconnect()

	clusterStatus, err := mgmClient.GetStatus()
	if err != nil {
		klog.Errorf("Error getting cluster status from management server: %s", err)
		return errorWhileProcessing(err)
	}

	// Group the nodes based on nodegroup.
	// The node ids are sorted within the sub arrays and the array
	// itself is sorted based on the node groups. Every sub array
	// will have RedundancyLevel number of node ids.
	nodesGroupedByNodegroups := clusterStatus.GetNodesGroupedByNodegroup()
	if nodesGroupedByNodegroups == nil {
		err := fmt.Errorf("internal error: could not extract nodes and node groups from cluster status")
		return errorWhileProcessing(err)
	}

	// Pick up the i'th node id from every sub array of
	// nodesGroupedByNodegroups during every iteration and
	// ensure that they all have the latest Pod definition.
	redundancyLevel := sc.configSummary.RedundancyLevel
	for i := 0; i < int(redundancyLevel); i++ {
		// Note down the node ids to be ensured
		candidateNodeIds := make([]int, 0, redundancyLevel)
		for _, nodesInNodegroup := range nodesGroupedByNodegroups {
			candidateNodeIds = append(candidateNodeIds, nodesInNodegroup[i])
		}

		// Check the pods running MySQL Cluster nodes with candidateNodeIds
		// and delete them if they have an older pod definition.
		var nodesBeingUpdated []int
		for _, nodeId := range candidateNodeIds {
			// Generate the pod name using nodeId.
			// Data node with nodeId 'i' runs in a pod with ordinal index 'i-1-numberOfMgmdNodes'
			ndbmtdPodName := fmt.Sprintf(
				"%s-%d", ndbmtdSfset.Name, nodeId-1-int(sc.configSummary.NumOfManagementNodes))

			// Check the pod version and delete it if its outdated
			podDeleted, err := sc.ensurePodVersion(
				ctx, ndbmtdSfset.Namespace, ndbmtdPodName, desiredPodRevisionHash,
				fmt.Sprintf("Data Node(nodeId=%d)", nodeId))
			if err != nil {
				return errorWhileProcessing(err)
			}

			if podDeleted {
				nodesBeingUpdated = append(nodesBeingUpdated, nodeId)
			}
		}

		if len(nodesBeingUpdated) > 0 {
			// The outdated data nodes are being updated.
			// Exit here and allow them to be restarted by the statefulset controllers.
			// Continue syncing once they are up, in a later reconciliation loop.
			klog.Infof("The data nodes %v, identified with old pod version, are being restarted", nodesBeingUpdated)
			// Stop processing. Reconciliation will continue
			// once the StatefulSet is fully ready again.
			return finishProcessing()
		}

		klog.Infof("The data nodes %v have the desired pod version %s", candidateNodeIds, desiredPodRevisionHash)
	}

	// Control will never reach here but to make compiler happy return continue.
	return continueProcessing()
}