pkg/controllers/statefulset_control.go (154 lines of code) (raw):

// Copyright (c) 2020, 2024, Oracle and/or its affiliates. // // Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/ package controllers import ( "context" "encoding/json" "github.com/mysql/ndb-operator/pkg/constants" "github.com/mysql/ndb-operator/pkg/resources/statefulset" appsv1 "k8s.io/api/apps/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/strategicpatch" "k8s.io/client-go/kubernetes" typedappsv1 "k8s.io/client-go/kubernetes/typed/apps/v1" appslisters "k8s.io/client-go/listers/apps/v1" klog "k8s.io/klog/v2" ) // ndbNodeStatefulSetImpl implements the NdbStatefulSetControlInterface to manage MySQL Cluster data nodes type ndbNodeStatefulSetImpl struct { client kubernetes.Interface statefulSetLister appslisters.StatefulSetLister ndbNodeStatefulset statefulset.NdbStatefulSetInterface } // newMgmdStatefulSetController creates a new ndbNodeStatefulSetImpl for Management Nodes func newMgmdStatefulSetController( client kubernetes.Interface, statefulSetLister appslisters.StatefulSetLister) *ndbNodeStatefulSetImpl { return &ndbNodeStatefulSetImpl{ client: client, statefulSetLister: statefulSetLister, ndbNodeStatefulset: statefulset.NewMgmdStatefulSet(), } } // statefulSetInterface returns a typed/apps/v1.StatefulSetInterface func (ndbSfset *ndbNodeStatefulSetImpl) statefulSetInterface(namespace string) typedappsv1.StatefulSetInterface { return ndbSfset.client.AppsV1().StatefulSets(namespace) } // GetTypeName returns the type of the statefulSetInterface // being controlled by the NdbStatefulSetControlInterface func (ndbSfset *ndbNodeStatefulSetImpl) GetTypeName() string { return ndbSfset.ndbNodeStatefulset.GetTypeName() } // GetStatefulSet retrieves the StatefulSet owned by the given NdbCluster resource func (ndbSfset *ndbNodeStatefulSetImpl) GetStatefulSet(sc *SyncContext) (*appsv1.StatefulSet, error) { // Get the StatefulSet from cache using statefulSetLister nc := sc.ndb sfsetName := ndbSfset.ndbNodeStatefulset.GetName(nc) sfset, err := ndbSfset.statefulSetLister.StatefulSets(nc.Namespace).Get(sfsetName) if err != nil && !errors.IsNotFound(err) { klog.Errorf("Failed to retrieve the StatefulSet %q for NdbCluster %q : %s", sfsetName, getNamespacedName(nc), err) return nil, err } if errors.IsNotFound(err) { // StatefulSet doesn't exist yet return nil, nil } // StatefulSet exists. Verify ownership if err = sc.isOwnedByNdbCluster(sfset); err != nil { // StatefulSet is not owned by the current NdbCluster resource return nil, err } return sfset, nil } // createStatefulSet creates the statefulSet returned by statefulset.NdbStatefulSetInterface in K8s func (ndbSfset *ndbNodeStatefulSetImpl) createStatefulSet( ctx context.Context, sc *SyncContext) (sfset *appsv1.StatefulSet, err error) { // Create the service to be used by the statefulset if _, err = sc.serviceController.ensureService(ctx, sc, ndbSfset.ndbNodeStatefulset); err != nil { return nil, err } // Create the service account to be used by the statefulset if _, err = sc.serviceaccountController.ensureServiceAccount(ctx, sc); err != nil { return nil, err } // Create StatefulSet nc := sc.ndb sfsetName := ndbSfset.ndbNodeStatefulset.GetName(nc) sfset, err = ndbSfset.ndbNodeStatefulset.NewStatefulSet(sc.configSummary, nc) if err != nil { return nil, err } klog.Infof("Creating StatefulSet %q of type %q with Replica = %d", getNamespacedName2(nc.Namespace, sfsetName), ndbSfset.ndbNodeStatefulset.GetTypeName(), *sfset.Spec.Replicas) sfsetInterface := ndbSfset.statefulSetInterface(nc.Namespace) sfset, err = sfsetInterface.Create(ctx, sfset, metav1.CreateOptions{}) if err != nil { if errors.IsAlreadyExists(err) { // The StatefulSet was created already but the cache // didn't have it yet. This also implies that the // statefulset is not ready yet. Return err = nil // to make the sync handler stop processing. The sync // will continue when the statefulset becomes ready. return nil, nil } // Unexpected error. Failed to create the resource. klog.Errorf("Failed to create StatefulSet %q : %s", getNamespacedName2(nc.Namespace, sfsetName), err) return nil, err } // New StatefulSet was successfully created return sfset, nil } // deleteStatefulSet deletes the given statefulSet func (ndbSfset *ndbNodeStatefulSetImpl) deleteStatefulSet(ctx context.Context, sfset *appsv1.StatefulSet, sc *SyncContext) error { err := ndbSfset.statefulSetInterface(sfset.Namespace).Delete(ctx, sfset.Name, metav1.DeleteOptions{}) if err != nil && !errors.IsNotFound(err) { // Delete failed with an error. // Ignore NotFound error as this delete might be a redundant // step, caused by an outdated cache read. klog.Errorf("Failed to delete the StatefulSet %q : %s", getNamespacedName(sfset), err) return err } klog.Errorf("Deleted the StatefulSet %q", getNamespacedName(sfset)) // Delete the governing Service return sc.serviceController.deleteService(ctx, sfset.Namespace, sc.ndb.GetServiceName(ndbSfset.GetTypeName())) } // EnsureStatefulSet creates a StatefulSet for the MySQL Cluster nodes if one doesn't exist already. func (ndbSfset *ndbNodeStatefulSetImpl) EnsureStatefulSet( ctx context.Context, sc *SyncContext) (sfset *appsv1.StatefulSet, existed bool, err error) { if sfset, err = ndbSfset.GetStatefulSet(sc); err != nil { // Error retrieving sfset return nil, false, err } else if sfset != nil { // StatefulSet exists and is owned by the NdbCluster resource return sfset, true, nil } // StatefulSet doesn't exist. Create it. sfset, err = ndbSfset.createStatefulSet(ctx, sc) return sfset, false, err } // patchStatefulSet patches the statefulSet func (ndbSfset *ndbNodeStatefulSetImpl) patchStatefulSet(ctx context.Context, existingStatefulSet *appsv1.StatefulSet, updatedStatefulSet *appsv1.StatefulSet) syncResult { // JSON encode both StatefulSets existingJSON, err := json.Marshal(existingStatefulSet) if err != nil { klog.Errorf("Failed to encode existing StatefulSet: %v", err) return errorWhileProcessing(err) } updatedJSON, err := json.Marshal(updatedStatefulSet) if err != nil { klog.Errorf("Failed to encode updated StatefulSet: %v", err) return errorWhileProcessing(err) } // Generate the patch to be applied patch, err := strategicpatch.CreateTwoWayMergePatch(existingJSON, updatedJSON, appsv1.StatefulSet{}) if err != nil { klog.Errorf("Failed to generate the patch to be applied: %v", err) return errorWhileProcessing(err) } // klog.Infof("Patching StatefulSets.\nExisting : %v\n. Modified : %v\nPatch : %v", string(existingJSON), string(updatedJSON), string(patch)) // Patch the StatefulSet sfsetInterface := ndbSfset.statefulSetInterface(existingStatefulSet.Namespace) updatedStatefulSet, err = sfsetInterface.Patch( ctx, existingStatefulSet.Name, types.StrategicMergePatchType, patch, metav1.PatchOptions{}) if err != nil { klog.Errorf("Failed to apply the patch to the StatefulSet %q : %s", getNamespacedName(existingStatefulSet), err) return errorWhileProcessing(err) } // Successfully applied the patch - StatefulSet has been updated. klog.Infof("StatefulSet %q has been patched successfully", getNamespacedName(updatedStatefulSet)) if updatedStatefulSet.Generation == existingStatefulSet.Generation { // No changes to StatefulSet spec. Only annotations/labels were updated. // Continue processing return continueProcessing() } // StatefulSet was patched successfully. // // For the management node StatefulSet, the controller will // handle rolling out the update to all the pods. Reconciliation // will continue once the update has been rolled out, so finish // processing. // // For the data node StatefulSet, the operator will handle // rolling out the update over multiple reconciliation loops. // The controller will update the status of the data node // StatefulSet and that will immediately trigger the next // reconciliation loop, so finish processing. return finishProcessing() } func (ndbSfset *ndbNodeStatefulSetImpl) ReconcileStatefulSet( ctx context.Context, sfset *appsv1.StatefulSet, sc *SyncContext) syncResult { cs := sc.configSummary if workloadHasConfigGeneration(sfset, cs.NdbClusterGeneration) { // Check if it is the second iteration of the sync handler. In the first iteration, // the --initial flag will be added to the data node pods, and the ConfigMap will // be patched to remove the DataNodeInitialRestart field to trigger the second iteration. // During this second iteration, the change will be noted by the statefulset, and it will // be patched again to remove the --initial flag from the data node pod's command argument. if !(ndbSfset.GetTypeName() == constants.NdbNodeTypeNdbmtd && isInitialFlagSet(sfset) && !sc.configSummary.DataNodeInitialRestart) { // StatefulSet upto date return continueProcessing() } } // StatefulSet has to be patched // Patch the Governing Service first if err := sc.serviceController.patchService(ctx, sc, ndbSfset.ndbNodeStatefulset); err != nil { return errorWhileProcessing(err) } // Patch the StatefulSet nc := sc.ndb updatedStatefulSet, err := ndbSfset.ndbNodeStatefulset.NewStatefulSet(cs, nc) if err != nil { return errorWhileProcessing(err) } if ndbSfset.GetTypeName() == constants.NdbNodeTypeNdbmtd && *(sfset.Spec.Replicas) < *(updatedStatefulSet.Spec.Replicas) { // New data nodes are being added to MySQL Cluster // config but do not start the new nodes yet. *(updatedStatefulSet.Spec.Replicas) = *(sfset.Spec.Replicas) // Set the AddNodeOnlineInProgress Annotation updatedStatefulSet.Annotations[AddNodeOnlineInProgress] = "true" } return ndbSfset.patchStatefulSet(ctx, sfset, updatedStatefulSet) }