pkg/resources/statefulset/ndbmtd_statefulset.go (246 lines of code) (raw):

// Copyright (c) 2022, 2023, Oracle and/or its affiliates. // // Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/ package statefulset import ( "strconv" "github.com/mysql/ndb-operator/config/debug" v1 "github.com/mysql/ndb-operator/pkg/apis/ndbcontroller/v1" "github.com/mysql/ndb-operator/pkg/constants" "github.com/mysql/ndb-operator/pkg/mgmapi" "github.com/mysql/ndb-operator/pkg/ndbconfig" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" listerscorev1 "k8s.io/client-go/listers/core/v1" klog "k8s.io/klog/v2" ) var ( // Ports to be exposed by the container and service ndbmtdPorts = []int32{1186} ) // ndbmtdStatefulSet implements the NdbStatefulSetInterface to control a set of data nodes type ndbmtdStatefulSet struct { baseStatefulSet secretLister listerscorev1.SecretLister } func (nss *ndbmtdStatefulSet) NewGoverningService(nc *v1.NdbCluster) *corev1.Service { return newService(nc, ndbmtdPorts, nss.nodeType, true, false) } // getPodVolumes returns a slice of volumes to be // made available to the data node pods. func (nss *ndbmtdStatefulSet) getPodVolumes(nc *v1.NdbCluster) []corev1.Volume { // Load the data node scripts from // the configmap into the pod via a volume podVolumes := []corev1.Volume{ { Name: helperScriptsVolName, VolumeSource: corev1.VolumeSource{ ConfigMap: &corev1.ConfigMapVolumeSource{ LocalObjectReference: corev1.LocalObjectReference{ Name: nc.GetConfigMapName(), }, DefaultMode: &ownerCanExecMode, Items: []corev1.KeyToPath{ { Key: constants.DataNodeStartupProbeScript, Path: constants.DataNodeStartupProbeScript, }, }, }, }, }, } // An empty directory volume needs to be provided // to the data node pods if the NdbCluster resource // doesn't have any PVCs defined to be used with // the data nodes. if nc.Spec.DataNode.PVCSpec == nil { podVolumes = append(podVolumes, *nss.getEmptyDirPodVolume(nss.getDataDirVolumeName())) } return podVolumes } // getVolumeMounts returns the volumes to be mounted to the ndbmtd containers func (nss *ndbmtdStatefulSet) getVolumeMounts() []corev1.VolumeMount { // return volume mounts return []corev1.VolumeMount{ { // Volume mount for data directory Name: nss.getDataDirVolumeName(), MountPath: dataDirectoryMountPath, }, // Volume mount for helper scripts nss.getHelperScriptVolumeMount(), // Mount the work dir volume nss.getWorkDirVolumeMount(), } } // getResourceRequestRequirements computes minimum memory required by the datanode // from the MySQL Cluster config and returns the ResourceList with the calculated memory func (nss *ndbmtdStatefulSet) getResourceRequestRequirements(nc *v1.NdbCluster) (corev1.ResourceList, error) { // Connect to the Management Server mgmClient, err := mgmapi.NewMgmClient(nc.GetConnectstring()) if err != nil { klog.Errorf("Failed to connect to Management Server : %s", err) return nil, err } // Retrieve all the config values required to compute the memory requirements dataMemory, err := mgmClient.GetDataMemory(0) if err != nil { klog.Errorf("GetDataMemory failed with error %s", err) return nil, err } maxNoOfTables, err := mgmClient.GetMaxNoOfTables(0) if err != nil { klog.Errorf("GetMaxNoOfTables failed with error %s", err) return nil, err } maxNoOfAttributes, err := mgmClient.GetMaxNoOfAttributes(0) if err != nil { klog.Errorf("GetMaxNoOfAttributes failed with error %s", err) return nil, err } maxNoOfOrderedIndexes, err := mgmClient.GetMaxNoOfOrderedIndexes(0) if err != nil { klog.Errorf("GetMaxNoOfOrderedIndexes failed with error %s", err) return nil, err } maxNoOfUniqueHashIndexes, err := mgmClient.GetMaxNoOfUniqueHashIndexes(0) if err != nil { klog.Errorf("GetMaxNoOfUniqueHashIndexes failed with error %s", err) return nil, err } maxNoOfConcurrentOperations, err := mgmClient.GetMaxNoOfConcurrentOperations(0) if err != nil { klog.Errorf("GetMaxNoOfConcurrentOperations failed with error %s", err) return nil, err } transactionBufferMemory, err := mgmClient.GetTransactionBufferMemory(0) if err != nil { klog.Errorf("GetTransactionBufferMemory failed with error %s", err) return nil, err } indexMemory, err := mgmClient.GetIndexMemory(0) if err != nil { klog.Errorf("GetIndexMemory failed with error %s", err) return nil, err } redoBuffer, err := mgmClient.GetRedoBuffer(0) if err != nil { klog.Infof("GetRedoBuffer failed with error %s", err) return nil, err } longMessageBuffer, err := mgmClient.GetLongMessageBuffer(0) if err != nil { klog.Errorf("GetLongMessageBuffer failed with error %s", err) return nil, err } diskPageBufferMemory, err := mgmClient.GetDiskPageBufferMemory(0) if err != nil { klog.Errorf("GetDiskPageBufferMemory failed with error %s", err) return nil, err } sharedGlobalMemory, err := mgmClient.GetSharedGlobalMemory(0) if err != nil { klog.Errorf("GetSharedGlobalMemory failed with error %s", err) return nil, err } transactionMemory, err := mgmClient.GetTransactionMemory(0) if err != nil { klog.Errorf("GetTransactionMemory failed with error %s", err) return nil, err } noOfFragmentLogParts, err := mgmClient.GetNoOfFragmentLogParts(0) if err != nil { klog.Errorf("GetNoOfFragmentLogParts failed with error %s", err) return nil, err } //size of the ndbmtd executable inside the pod //mysql version: 8.0.29 binarySize := uint64(12746520) if transactionMemory == 0 { transactionMemory = dataMemory / 10 } totalMemory := dataMemory + uint64(maxNoOfTables) + uint64(maxNoOfAttributes) + uint64(maxNoOfOrderedIndexes) + uint64(maxNoOfUniqueHashIndexes) + uint64(maxNoOfConcurrentOperations) + uint64(transactionBufferMemory) + indexMemory + uint64(redoBuffer*noOfFragmentLogParts) + uint64(noOfFragmentLogParts) + uint64(longMessageBuffer) + diskPageBufferMemory + sharedGlobalMemory + transactionMemory + binarySize return corev1.ResourceList{ "memory": resource.MustParse(strconv.FormatUint(totalMemory, 10)), }, nil } // getContainers returns the containers to run a data Node func (nss *ndbmtdStatefulSet) getContainers(nc *v1.NdbCluster, addInitialFlag bool) ([]corev1.Container, error) { // Command and args to run the Data node cmdAndArgs := []string{ "/usr/sbin/ndbmtd", "-c", nc.GetConnectstring(), "--foreground", // Pass the nodeId to be used to prevent invalid // nodeId allocation during statefulset patching. "--ndb-nodeid=$(cat " + NodeIdFilePath + ")", } if debug.Enabled { // Increase verbosity in debug mode cmdAndArgs = append(cmdAndArgs, "-v") } if nc.Spec.TDESecretName != "" { secret, err := nss.secretLister.Secrets(nc.Namespace).Get(nc.Spec.TDESecretName) if err != nil { // Secret does not exist klog.Errorf("Failed to retrieve Secret %q : %s", nc.Spec.TDESecretName, err) return nil, err } pass := string(secret.Data[corev1.BasicAuthPasswordKey]) cmdAndArgs = append(cmdAndArgs, "--filesystem-password="+pass) } if addInitialFlag { cmdAndArgs = append(cmdAndArgs, "--initial") } ndbmtdContainer := nss.createContainer( nc, nss.getContainerName(false), cmdAndArgs, nss.getVolumeMounts(), ndbmtdPorts) // Setup startup probe for data nodes. // The probe uses a script that checks if a data node has started, by // connecting to the Management node via ndb_mgm. This implies that atleast // one Management node has to be available for the probe to succeed. This // is fine as that is already a requirement for a data node to start. Even // if the Management node crashes or becomes unavailable when the data node // is going through the start phases, the Management node will be // rescheduled immediately and will become ready within a few seconds, // enabling the data node startup probe to succeed. ndbmtdContainer.StartupProbe = &corev1.Probe{ ProbeHandler: corev1.ProbeHandler{ Exec: &corev1.ExecAction{ // ndbmtd-startup-probe.sh Command: []string{ "/bin/bash", helperScriptsMountPath + "/" + constants.DataNodeStartupProbeScript, }, }, }, // expect data node to get ready within 15 minutes PeriodSeconds: 2, TimeoutSeconds: 2, FailureThreshold: 450, } // Set resource request to data node container resList, err := nss.getResourceRequestRequirements(nc) if err == nil { ndbmtdContainer.Resources = corev1.ResourceRequirements{ Requests: resList, } } else { klog.Warningf("Failed to set ResourceRequirements to %s", ndbmtdContainer.Name) } return []corev1.Container{ndbmtdContainer}, nil } func (nss *ndbmtdStatefulSet) getPodAntiAffinity() *corev1.PodAntiAffinity { // Default pod AntiAffinity rules for Data Nodes return GetPodAntiAffinityRules([]constants.NdbNodeType{ constants.NdbNodeTypeMgmd, constants.NdbNodeTypeMySQLD, constants.NdbNodeTypeNdbmtd, }) } // NewStatefulSet returns the StatefulSet specification to start and manage the Data nodes. func (nss *ndbmtdStatefulSet) NewStatefulSet(cs *ndbconfig.ConfigSummary, nc *v1.NdbCluster) (*appsv1.StatefulSet, error) { var err error statefulSet := nss.newStatefulSet(nc, cs) statefulSetSpec := &statefulSet.Spec // Fill in ndbmtd specific values replicas := cs.NumOfDataNodes statefulSetSpec.Replicas = &replicas // Set pod management policy to start Data nodes in parallel statefulSetSpec.PodManagementPolicy = appsv1.ParallelPodManagement // Use the legacy OnDelete update strategy to get more // control over how the update is rolled out to data nodes statefulSetSpec.UpdateStrategy = appsv1.StatefulSetUpdateStrategy{ Type: appsv1.OnDeleteStatefulSetStrategyType, } // Add VolumeClaimTemplate if data node PVC Spec exists if nc.Spec.DataNode.PVCSpec != nil { statefulSetSpec.VolumeClaimTemplates = []corev1.PersistentVolumeClaim{ // This PVC will be used as a template and an actual PVC will be created by the // statefulset controller with name "<data-dir-vol-name(i.e ndbmtd-data-vol)>-<pod-name>" *newPVC(nc, nss.getDataDirVolumeName(), nc.Spec.DataNode.PVCSpec), } } // Update template pod spec podSpec := &statefulSetSpec.Template.Spec podSpec.Containers, err = nss.getContainers(nc, cs.DataNodeInitialRestart) if err != nil { klog.Errorf("Failed to get containers for the statefulset %s", statefulSet.Name) return nil, err } podSpec.Volumes = append(podSpec.Volumes, nss.getPodVolumes(nc)...) // Set default AntiAffinity rules podSpec.Affinity = &corev1.Affinity{ PodAntiAffinity: nss.getPodAntiAffinity(), } // Copy down any podSpec specified via CRD CopyPodSpecFromNdbPodSpec(podSpec, nc.Spec.DataNode.NdbPodSpec) return statefulSet, nil } // NewNdbmtdStatefulSet returns a new NdbStatefulSetInterface for data nodes func NewNdbmtdStatefulSet(secretLister listerscorev1.SecretLister) NdbStatefulSetInterface { return &ndbmtdStatefulSet{ baseStatefulSet{ nodeType: constants.NdbNodeTypeNdbmtd, }, secretLister, } }