pkg/controllers/sync_context.go (390 lines of code) (raw):

// Copyright (c) 2021, 2024, Oracle and/or its affiliates. // // Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/ package controllers import ( "context" "fmt" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/kubernetes" listerscorev1 "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/events" "k8s.io/client-go/util/retry" klog "k8s.io/klog/v2" v1 "github.com/mysql/ndb-operator/pkg/apis/ndbcontroller/v1" ndbclientset "github.com/mysql/ndb-operator/pkg/generated/clientset/versioned" ndblisters "github.com/mysql/ndb-operator/pkg/generated/listers/ndbcontroller/v1" "github.com/mysql/ndb-operator/pkg/mgmapi" "github.com/mysql/ndb-operator/pkg/ndbconfig" ) // SyncContext stores all information collected in/for a single run of syncHandler type SyncContext struct { // Summary of the MySQL Cluster configuration extracted from the current configmap configSummary *ndbconfig.ConfigSummary // Workload resources mgmdNodeSfset *appsv1.StatefulSet dataNodeSfSet *appsv1.StatefulSet mysqldSfset *appsv1.StatefulSet ndb *v1.NdbCluster // controller handling creation and changes of resources mgmdController *ndbNodeStatefulSetImpl ndbmtdController *ndbmtdStatefulSetController mysqldController *mysqldStatefulSetController configMapController ConfigMapControlInterface serviceController ServiceControlInterface serviceaccountController ServiceAccountControlInterface pdbController PodDisruptionBudgetControlInterface kubernetesClient kubernetes.Interface ndbClient ndbclientset.Interface ndbsLister ndblisters.NdbClusterLister podLister listerscorev1.PodLister pvcLister listerscorev1.PersistentVolumeClaimLister serviceLister listerscorev1.ServiceLister // bool flag to control the NdbCluster status processedGeneration value syncSuccess bool // recorder is an event recorder for recording Event resources to the Kubernetes API. recorder events.EventRecorder } const ( // ClusterLabel is applied to all the resources owned by an // NdbCluster resource Ready = "Ready" // ClusterNodeTypeLabel is applied to all the pods owned by an // NdbCluster resource Complete = "Complete" ) func (sc *SyncContext) kubeClientset() kubernetes.Interface { return sc.kubernetesClient } func (sc *SyncContext) ndbClientset() ndbclientset.Interface { return sc.ndbClient } // isOwnedByNdbCluster returns an error if the given // object is not owned by the NdbCluster resource being synced. func (sc *SyncContext) isOwnedByNdbCluster(object metav1.Object) error { if !metav1.IsControlledBy(object, sc.ndb) { // The object is not owned by the NdbCluster resource being synced. // Record a warning and return an error. err := fmt.Errorf(MessageResourceExists, getNamespacedName(object)) sc.recorder.Eventf(sc.ndb, nil, corev1.EventTypeWarning, ReasonResourceExists, ActionNone, err.Error()) return err } return nil } // ensureManagementServerStatefulSet creates the StatefulSet for // Management Server in the K8s Server if they don't exist yet. func (sc *SyncContext) ensureManagementServerStatefulSet( ctx context.Context) (mgmdStatefulSet *appsv1.StatefulSet, existed bool, err error) { return sc.mgmdController.EnsureStatefulSet(ctx, sc) } // ensureDataNodeStatefulSet creates the StatefulSet for // Data Nodes in the K8s Server if they don't exist yet. func (sc *SyncContext) ensureDataNodeStatefulSet( ctx context.Context) (ndbmtdStatefulSet *appsv1.StatefulSet, existed bool, err error) { return sc.ndbmtdController.EnsureStatefulSet(ctx, sc) } // validateMySQLServerStatefulSet retrieves the MySQL Server statefulset from K8s. // If the statefulset exists, it verifies if it is owned by the NdbCluster resource. func (sc *SyncContext) validateMySQLServerStatefulSet() (*appsv1.StatefulSet, error) { return sc.mysqldController.GetStatefulSet(sc) } // ensurePodDisruptionBudgets creates PodDisruptionBudgets for data nodes func (sc *SyncContext) ensurePodDisruptionBudget(ctx context.Context) (existed bool, err error) { // ensure ndbmtd PDB if sc.pdbController == nil { // v1 policy is not supported // return true to suppress operator's "created" log return true, nil } return sc.pdbController.EnsurePodDisruptionBudget( ctx, sc, sc.ndbmtdController.GetTypeName()) } // reconcileManagementNodeStatefulSet patches the Management Node // StatefulSet with the spec from the latest generation of NdbCluster. func (sc *SyncContext) reconcileManagementNodeStatefulSet(ctx context.Context) syncResult { return sc.mgmdController.ReconcileStatefulSet(ctx, sc.mgmdNodeSfset, sc) } // reconcileDataNodeStatefulSet patches the Data Node StatefulSet // with the spec from the latest generation of NdbCluster. func (sc *SyncContext) reconcileDataNodeStatefulSet(ctx context.Context) syncResult { return sc.ndbmtdController.ReconcileStatefulSet(ctx, sc.dataNodeSfSet, sc) } // ensurePodVersion checks if the pod with the given name has the desired pod version. // If the pod is running with an old spec version, it will be deleted allowing the // statefulSet controller to restart it with the latest pod definition. func (sc *SyncContext) ensurePodVersion( ctx context.Context, namespace, podName, desiredPodVersion, podDescription string) ( podDeleted bool, err error) { // Retrieve the pod and extract its version pod, err := sc.podLister.Pods(namespace).Get(podName) if err != nil { klog.Errorf("Failed to find pod '%s/%s' running %s : %s", namespace, podName, podDescription, err) return false, err } podVersion := pod.GetLabels()["controller-revision-hash"] klog.Infof("Version of %s's pod : %s", podDescription, podVersion) if podVersion == desiredPodVersion { klog.Infof("%s has the desired version of podSpec", podDescription) return false, nil } klog.Infof("%s does not have desired version of podSpec", podDescription) // The Pod does not have the desired version. // Delete it and let the statefulset controller restart it with the latest pod definition. err = sc.kubeClientset().CoreV1().Pods(namespace).Delete(ctx, podName, metav1.DeleteOptions{}) if err != nil { klog.Errorf("Failed to delete pod '%s/%s' running %s : %s", namespace, podName, podDescription, err) return false, err } // The pod has been deleted. klog.Infof("Pod running %s is being restarted with the desired configuration", podDescription) return true, nil } // ensureDataNodePodVersion checks if all the Data Node pods // have the latest podSpec defined by the StatefulSet. If not, it safely // restarts them without affecting the availability of MySQL Cluster. // // The method chooses one data node per nodegroup and checks their PodSpec // version. The nodes that have an outdated PodSpec version among them // will be deleted together allowing the K8s StatefulSet controller to // restart them with the latest pod definition along with the latest // config available in the config map. When the chosen data nodes are // being restarted and updated, any further reconciliation is stopped, and // is resumed only after the restarted data nodes become ready. At any // given time, only one data node per group will be affected by this // maneuver, ensuring MySQL Cluster's availability. func (sc *SyncContext) ensureDataNodePodVersion(ctx context.Context) syncResult { ndbmtdSfset := sc.dataNodeSfSet if statefulsetUpdateComplete(ndbmtdSfset) { // All data nodes have the desired pod version. // Continue with rest of the sync process. klog.Info("All Data node pods are up-to-date and ready") return continueProcessing() } desiredPodRevisionHash := ndbmtdSfset.Status.UpdateRevision klog.Infof("Ensuring Data Node pods have the desired podSpec version, %s", desiredPodRevisionHash) // Get the node and nodegroup details via clusterStatus mgmClient, err := mgmapi.NewMgmClient(sc.ndb.GetConnectstring()) if err != nil { return errorWhileProcessing(err) } defer mgmClient.Disconnect() clusterStatus, err := mgmClient.GetStatus() if err != nil { klog.Errorf("Error getting cluster status from management server: %s", err) return errorWhileProcessing(err) } // Group the nodes based on nodegroup. // The node ids are sorted within the sub arrays and the array // itself is sorted based on the node groups. Every sub array // will have RedundancyLevel number of node ids. nodesGroupedByNodegroups := clusterStatus.GetNodesGroupedByNodegroup() if nodesGroupedByNodegroups == nil { err := fmt.Errorf("internal error: could not extract nodes and node groups from cluster status") return errorWhileProcessing(err) } // Pick up the i'th node id from every sub array of // nodesGroupedByNodegroups during every iteration and // ensure that they all have the latest Pod definition. redundancyLevel := sc.configSummary.RedundancyLevel for i := 0; i < int(redundancyLevel); i++ { // Note down the node ids to be ensured candidateNodeIds := make([]int, 0, redundancyLevel) for _, nodesInNodegroup := range nodesGroupedByNodegroups { candidateNodeIds = append(candidateNodeIds, nodesInNodegroup[i]) } // Check the pods running MySQL Cluster nodes with candidateNodeIds // and delete them if they have an older pod definition. var nodesBeingUpdated []int for _, nodeId := range candidateNodeIds { // Generate the pod name using nodeId. // Data node with nodeId 'i' runs in a pod with ordinal index 'i-1-numberOfMgmdNodes' ndbmtdPodName := fmt.Sprintf( "%s-%d", ndbmtdSfset.Name, nodeId-1-int(sc.configSummary.NumOfManagementNodes)) // Check the pod version and delete it if its outdated podDeleted, err := sc.ensurePodVersion( ctx, ndbmtdSfset.Namespace, ndbmtdPodName, desiredPodRevisionHash, fmt.Sprintf("Data Node(nodeId=%d)", nodeId)) if err != nil { return errorWhileProcessing(err) } if podDeleted { nodesBeingUpdated = append(nodesBeingUpdated, nodeId) } } if len(nodesBeingUpdated) > 0 { // The outdated data nodes are being updated. // Exit here and allow them to be restarted by the statefulset controllers. // Continue syncing once they are up, in a later reconciliation loop. klog.Infof("The data nodes %v, identified with old pod version, are being restarted", nodesBeingUpdated) // Stop processing. Reconciliation will continue // once the StatefulSet is fully ready again. return finishProcessing() } klog.Infof("The data nodes %v have the desired pod version %s", candidateNodeIds, desiredPodRevisionHash) } // Control will never reach here but to make compiler happy return continue. return continueProcessing() } // ensureAllResources creates all K8s resources required for running the // MySQL Cluster if they do no exist already. Resource creation needs to // be idempotent just like any other step in the syncHandler. The config // in the config map is used by this step rather than the spec of the // NdbCluster resource to ensure a consistent setup across multiple // reconciliation loops. func (sc *SyncContext) ensureAllResources(ctx context.Context) syncResult { var err error var resourceExists bool // create pod disruption budgets if resourceExists, err = sc.ensurePodDisruptionBudget(ctx); err != nil { return errorWhileProcessing(err) } if !resourceExists { klog.Info("Created resource : Pod Disruption Budgets") } // ensure config map var cm *corev1.ConfigMap if cm, resourceExists, err = sc.configMapController.EnsureConfigMap(ctx, sc); err != nil { return errorWhileProcessing(err) } if !resourceExists { klog.Info("Created resource : Config Map") } // Create a new ConfigSummary if sc.configSummary, err = ndbconfig.NewConfigSummary(cm.Data); err != nil { // less likely to happen as the only possible error is a config // parse error, and the configString was generated by the operator return errorWhileProcessing(err) } // First ensure that a operator password secret exists before creating statefulSet secretClient := NewMySQLUserPasswordSecretInterface(sc.kubernetesClient) if _, err := secretClient.EnsureNDBOperatorPassword(ctx, sc.ndb); err != nil { klog.Errorf("Failed to ensure ndb-operator password secret : %s", err) return errorWhileProcessing(err) } initialSystemRestart := sc.ndb.Status.ProcessedGeneration == 0 nc := sc.ndb if nc.HasSyncError() { // Patch config map if new spec is available patched, err := sc.patchConfigMap(ctx) if patched { return finishProcessing() } else if err != nil { return errorWhileProcessing(err) } } // create the management stateful set if it doesn't exist if sc.mgmdNodeSfset, resourceExists, err = sc.ensureManagementServerStatefulSet(ctx); err != nil { return errorWhileProcessing(err) } if !resourceExists { // Management statefulset was just created. klog.Info("Created resource : StatefulSet for Management Nodes") // Wait for it to become ready before starting the data nodes. // Reconciliation will continue once all the pods in the statefulset are ready. klog.Infof("Reconciliation will continue after all the management nodes are ready.") return finishProcessing() } if initialSystemRestart && !statefulsetUpdateComplete(sc.mgmdNodeSfset) { if !workloadHasConfigGeneration(sc.mgmdNodeSfset, sc.configSummary.NdbClusterGeneration) { // Note: This case will arise during initial system restart, when there is an // error applying the NDB spec and the user updated the spec to rectify the error. // Since, the Configmap is already updated, just patching the statefulset alone // should work. But, mgmd statefulset has pod management policy set to OrderedReady. // Due to an issue in k8s, patching statefulset with OrderedReady does not restart the // pod until the backoff timer expires. And if the backoff time is high, the pods will // need to wait longer to get restarted. So, manually delete the pod after patching the // Statefulset. // User updated the NDB spec and the statefulset needs to be patched. klog.Infof("A new generation of NdbCluster spec exists and the statefulset %q needs to be updated", getNamespacedName(sc.mgmdNodeSfset)) if sr := sc.reconcileManagementNodeStatefulSet(ctx); sr.stopSync() { if err := sr.getError(); err == nil { // ManagementNodeStatefulSet patched successfully klog.Infof("Delete smallest ordinal pod manually to avoid delay in restart") _, err := sc.deletePodOnStsUpdate(ctx, sc.mgmdNodeSfset, 0) return errorWhileProcessing(err) } return sr } } // Management nodes are starting for the first time, and // one of them is not ready yet which implies that this // reconciliation was triggered only to update the // NdbCluster status. No need to start data nodes and // MySQL Servers yet. return finishProcessing() } // create the data node stateful set if it doesn't exist if sc.dataNodeSfSet, resourceExists, err = sc.ensureDataNodeStatefulSet(ctx); err != nil { return errorWhileProcessing(err) } if !resourceExists { // Data nodes statefulset was just created. klog.Info("Created resource : StatefulSet for Data Nodes") // Wait for it to become ready. // Reconciliation will continue once all the pods in the statefulset are ready. klog.Infof("Reconciliation will continue after all the data nodes are ready.") return finishProcessing() } if initialSystemRestart && !statefulsetUpdateComplete(sc.dataNodeSfSet) { if !workloadHasConfigGeneration(sc.dataNodeSfSet, sc.configSummary.NdbClusterGeneration) { // User updated the NDB spec and the statefulset needs to be patched klog.Infof("A new generation of NdbCluster spec exists and the statefulset %q needs to be updated", getNamespacedName(sc.dataNodeSfSet)) if sr := sc.reconcileDataNodeStatefulSet(ctx); sr.stopSync() { return sr } } // Data nodes are starting for the first time, and some of // them are not ready yet which implies that this // reconciliation was triggered only to update the NdbCluster // status. No need to proceed further. return finishProcessing() } // MySQL Server StatefulSet will be created only if required. // For now, just verify that if it exists, it is indeed owned by the NdbCluster resource. if sc.mysqldSfset, err = sc.validateMySQLServerStatefulSet(); err != nil { return errorWhileProcessing(err) } if sc.mysqldSfset != nil && !statefulsetUpdateComplete(sc.mysqldSfset) && !nc.HasSyncError() { // MySQL Server StatefulSet exists, but it is not complete yet // which implies that this reconciliation was triggered only // to update the NdbCluster status. No need to proceed further. return finishProcessing() } // The StatefulSets already existed before this sync loop. // There is a rare chance that some other resources were created during // this sync loop as they were dropped by some other application other // than the operator. We can still continue processing in that case as // they will become immediately ready. klog.Infof("All resources exist") return continueProcessing() } // retrievePodErrors lists all the pods owned by the current NdbCluster // resource and then returns any errors encountered by them. func (sc *SyncContext) retrievePodErrors() (errs []string) { nc := sc.ndb // List all pods owned by NdbCluster resource pods, listErr := sc.podLister.Pods(nc.Namespace).List(labels.Set(nc.GetLabels()).AsSelector()) if listErr != nil { klog.Errorf("Failed to list pods owned by NdbCluster %q : %s", getNamespacedName(nc), listErr) return []string{listErr.Error()} } // Retrieve errors from all listed pods for _, pod := range pods { errs = append(errs, getPodErrors(pod)...) } return errs } // isStatefulsetUpdated checks if the given StatefulSet has the expectedConfigGeneration. // If the StatefulSet is in ready/completed state or the StatefulSet does not have the // expectedConfigGeneration it returns true. In all other cases it returns false. func (sc *SyncContext) isStatefulsetUpdated(statefulset *appsv1.StatefulSet, expectedConfigGeneration int64, completeOrReady string) bool { if statefulset == nil { return true } else if !workloadHasConfigGeneration(statefulset, expectedConfigGeneration) { // The statefulset does not have the expected generation. return true } else { if completeOrReady == Ready { return statefulsetReady(statefulset) } else if completeOrReady == Complete { return statefulsetUpdateComplete(statefulset) } klog.Infof("invalid argument: upgradeOrReady string") return false } } // ensureWorkloadsReadiness checks if all the workloads created for the // NdbCluster resource are ready. The sync is stopped if they are not ready. func (sc *SyncContext) ensureWorkloadsReadiness() syncResult { NdbGeneration := sc.configSummary.NdbClusterGeneration // The data node StatefulSet should be ready and both the mgmd and MySQL StatefulSets should // be complete if the workloads has same ndb generation as config summary. If the workloads // has different generation, then this implies that there is a spec change and hence that // particular workload needs to be patched. So, no need to wait for the stale versions if sc.isStatefulsetUpdated(sc.mgmdNodeSfset, NdbGeneration, Complete) && sc.isStatefulsetUpdated(sc.dataNodeSfSet, NdbGeneration, Ready) && sc.isStatefulsetUpdated(sc.mysqldSfset, NdbGeneration, Complete) { klog.Infof("All workloads owned by the NdbCluster resource %q are ready", getNamespacedName(sc.ndb)) return continueProcessing() } // Some workload is not ready yet => some pods are not ready yet klog.Infof("Some pods owned by the NdbCluster resource %q are not ready yet", getNamespacedName(sc.ndb)) // Stop processing. // Reconciliation will continue when all the pods are ready. return finishProcessing() } // sync updates the configuration of the MySQL Cluster running // inside the K8s Cluster based on the NdbCluster spec. This is // the core reconciliation loop, and a complete update takes // place over multiple sync calls. func (sc *SyncContext) sync(ctx context.Context) syncResult { // Multiple resources are required to start // and run the MySQL Cluster in K8s. Create // them if they do not exist yet. if sr := sc.ensureAllResources(ctx); sr.stopSync() { if err := sr.getError(); err != nil { klog.Errorf("Failed to ensure that all the required resources exist. Error : %v", err) } return sr } // All resources and workloads exist. // Continue further only if all the workloads are ready. if sr := sc.ensureWorkloadsReadiness(); sr.stopSync() { // Some workloads are not ready yet => The MySQL Cluster is not fully up yet. // Any further config changes cannot be processed until the pods are ready. return sr } // The workloads are ready => MySQL Cluster is healthy. // Before starting to handle any new changes from the Ndb // Custom object, verify that the MySQL Cluster is in sync // with the current config in the config map. This is to avoid // applying config changes midway through a previous config // change. This also means that this entire reconciliation // will be spent only on this verification. If the MySQL // Cluster has the expected config, the K8s config map will be // updated with the new config, specified by the Ndb object, // at the end of this loop. The new changes will be applied to // the MySQL Cluster starting from the next reconciliation loop. // First pass of MySQL Server reconciliation. // If any scale down was requested, it will be handled in this pass. // This is done separately to ensure that the MySQL Servers are shut // down before possibly reducing the number of API sections in config. if sr := sc.mysqldController.HandleScaleDown(ctx, sc); sr.stopSync() { return sr } // Reconcile Management Server by updating the statefulSet definition. // Management StatefulSet uses the default RollingUpdate strategy and // the update will be rolled out by the controller once the StatefulSet // is patched. if sr := sc.reconcileManagementNodeStatefulSet(ctx); sr.stopSync() { if err := sr.getError(); err == nil { // ManagementNodeStatefulSet patched successfully if sc.ndb.HasSyncError() { mgmdReplicaCount := *(sc.mgmdNodeSfset.Spec.Replicas) klog.Infof("Delete largest ordinal pod manually to avoid delay in restart") _, err := sc.deletePodOnStsUpdate(ctx, sc.mgmdNodeSfset, mgmdReplicaCount-1) return errorWhileProcessing(err) } } return sr } klog.Info("All Management node pods are up-to-date and ready") // Reconcile Data Nodes by updating their statefulSet definition if sr := sc.reconcileDataNodeStatefulSet(ctx); sr.stopSync() { return sr } // Restart Data Node pods, if required, to update their definitions if sr := sc.ensureDataNodePodVersion(ctx); sr.stopSync() { return sr } // Second pass of MySQL Server reconciliation // Reconcile the rest of spec/config change in MySQL Server StatefulSet if sr := sc.mysqldController.ReconcileStatefulSet(ctx, sc); sr.stopSync() { return sr } // Handle online add data node request if sr := sc.ndbmtdController.handleAddNodeOnline(ctx, sc); sr.stopSync() { return sr } // Reconcile the Root user if sr := sc.mysqldController.reconcileRootUser(ctx, sc); sr.stopSync() { return sr } // At this point, the MySQL Cluster is in sync with the configuration in the config map. // The configuration in the config map has to be checked to see if it is still the // desired config specified in the Ndb object. klog.Infof("The generation of the config in the configMap : \"%d\"", sc.configSummary.NdbClusterGeneration) // Check if the config map has processed the latest NdbCluster Generation patched, err := sc.patchConfigMap(ctx) if patched { // Only the config map was updated during this loop. // The next loop will actually start the sync return finishProcessing() } else if err != nil { klog.Errorf("Failed to patch the ConfigMap. Error : %v", err) return errorWhileProcessing(err) } // MySQL Cluster in sync with the NdbCluster spec sc.syncSuccess = true return finishProcessing() } // updateNdbClusterStatus updates the status of the SyncContext's NdbCluster resource in the K8s API Server func (sc *SyncContext) updateNdbClusterStatus(ctx context.Context) (statusUpdated bool, err error) { // Use the DeepCopied NdbCluster resource to make the update nc := sc.ndb // Generate status with recent state of various resources status := sc.calculateNdbClusterStatus() // Update the status. Use RetryOnConflict to automatically handle // conflicts that can occur if the spec changes between the time // the controller gets the NdbCluster object and updates it. ndbClusterInterface := sc.ndbClientset().MysqlV1().NdbClusters(sc.ndb.Namespace) err = retry.RetryOnConflict(retry.DefaultRetry, func() error { // Check if the status has changed, if not the K8s update can be skipped if statusEqual(&nc.Status, status) { // Status up-to-date. No update required. return nil } // Update the status of NdbCluster status.DeepCopyInto(&nc.Status) // Send the update to K8s server klog.Infof("Updating the NdbCluster resource %q status", getNamespacedName(sc.ndb)) _, updateErr := ndbClusterInterface.UpdateStatus(ctx, nc, metav1.UpdateOptions{}) if updateErr == nil { // Status update succeeded statusUpdated = true return nil } // Get the latest version of the NdbCluster object from // the K8s API Server for retrying the update. var getErr error nc, getErr = ndbClusterInterface.Get(ctx, sc.ndb.Name, metav1.GetOptions{}) if getErr != nil { klog.Errorf("Failed to get NdbCluster resource during status update %q: %v", getNamespacedName(sc.ndb), getErr) return getErr } // Return the updateErr. If it is a conflict error, RetryOnConflict // will retry the update with the latest version of the NdbCluster object. return updateErr }) if err != nil { klog.Errorf("Failed to update the status of NdbCluster resource %q : %v", getNamespacedName(sc.ndb), err) } return statusUpdated, err } // patchConfigMap patches the config map with the newer spec of the NdbCluster resource func (sc *SyncContext) patchConfigMap(ctx context.Context) (bool, error) { // Check if the config map has processed the latest NdbCluster Generation if sc.configSummary.NdbClusterGeneration != sc.ndb.Generation { // The Ndb object spec has changed - patch the config map klog.Info("A new generation of NdbCluster spec exists and the config map needs to be updated") if _, err := sc.configMapController.PatchConfigMap(ctx, sc); err != nil { return false, err } return true, nil } if isInitialFlagSet(sc.dataNodeSfSet) { // Patch the config map klog.Info("Initial restart of Data nodes are completed and the config map needs to be updated to remove the initial flag") if _, err := sc.configMapController.PatchConfigMap(ctx, sc); err != nil { return false, err } return true, nil } // ConfigMap already upToDate return false, nil } // deletePodOnStsUpdate deletes the pod with given pod ordinal index func (sc *SyncContext) deletePodOnStsUpdate( ctx context.Context, mgmdSfset *appsv1.StatefulSet, podOrdinalIndex int32) (podDeleted bool, err error) { namespace := mgmdSfset.Namespace podName := fmt.Sprintf( "%s-%d", mgmdSfset.Name, podOrdinalIndex) // Delete it and let the statefulset controller restart it with the latest pod definition. err = sc.kubeClientset().CoreV1().Pods(namespace).Delete(ctx, podName, metav1.DeleteOptions{}) if err != nil { klog.Errorf("Failed to delete pod '%s/%s' running : %s", namespace, podName, err) return false, err } klog.Infof("Successfully deleted pod '%s/%s'", namespace, podName) // The pod has been deleted. return true, nil }