func NewController()

in pkg/controllers/controller.go [69:330]


func NewController(
	kubernetesClient kubernetes.Interface,
	ndbClient ndbclientset.Interface,
	k8sSharedIndexInformer kubeinformers.SharedInformerFactory,
	ndbSharedIndexInformer ndbinformers.SharedInformerFactory) *Controller {

	// Register for all the required informers
	ndbClusterInformer := ndbSharedIndexInformer.Mysql().V1().NdbClusters()
	statefulSetInformer := k8sSharedIndexInformer.Apps().V1().StatefulSets()
	podInformer := k8sSharedIndexInformer.Core().V1().Pods()
	serviceInformer := k8sSharedIndexInformer.Core().V1().Services()
	configmapInformer := k8sSharedIndexInformer.Core().V1().ConfigMaps()
	secretInformer := k8sSharedIndexInformer.Core().V1().Secrets()
	serviceAccountInformer := k8sSharedIndexInformer.Core().V1().ServiceAccounts()
	pvcInformer := k8sSharedIndexInformer.Core().V1().PersistentVolumeClaims()

	// Extract all the InformerSynced methods
	informerSyncedMethods := []cache.InformerSynced{
		ndbClusterInformer.Informer().HasSynced,
		statefulSetInformer.Informer().HasSynced,
		podInformer.Informer().HasSynced,
		serviceInformer.Informer().HasSynced,
		configmapInformer.Informer().HasSynced,
		secretInformer.Informer().HasSynced,
		serviceAccountInformer.Informer().HasSynced,
		pvcInformer.Informer().HasSynced,
	}

	serviceLister := serviceInformer.Lister()
	statefulSetLister := statefulSetInformer.Lister()
	configmapLister := configmapInformer.Lister()
	secretLister := secretInformer.Lister()
	serviceAccountLister := serviceAccountInformer.Lister()

	controller := &Controller{
		kubernetesClient:         kubernetesClient,
		ndbClient:                ndbClient,
		informerSyncedMethods:    informerSyncedMethods,
		ndbsLister:               ndbClusterInformer.Lister(),
		podLister:                podInformer.Lister(),
		pvcLister:                pvcInformer.Lister(),
		configMapController:      NewConfigMapControl(kubernetesClient, configmapLister),
		serviceController:        NewServiceControl(kubernetesClient, serviceLister),
		serviceAccountController: NewServiceAccountControl(kubernetesClient, serviceAccountLister),
		workqueue:                workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Ndbs"),
		recorder:                 newEventRecorder(kubernetesClient),

		mgmdController:   newMgmdStatefulSetController(kubernetesClient, statefulSetLister),
		ndbmtdController: newNdbmtdStatefulSetController(kubernetesClient, statefulSetLister, secretLister),
		mysqldController: newMySQLDStatefulSetController(
			kubernetesClient, statefulSetLister, configmapLister),
	}

	// Setup informer and controller for v1.PDB if K8s Server has the support
	if ServerSupportsV1Policy(kubernetesClient) {
		pdbInformer := k8sSharedIndexInformer.Policy().V1().PodDisruptionBudgets()
		controller.informerSyncedMethods = append(controller.informerSyncedMethods, pdbInformer.Informer().HasSynced)
		controller.pdbController = newPodDisruptionBudgetControl(kubernetesClient, pdbInformer.Lister())
	}

	klog.Info("Setting up event handlers")
	// Set up event handler for NdbCluster resource changes
	ndbClusterInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{

		AddFunc: func(obj interface{}) {
			ndb := obj.(*v1.NdbCluster)
			ndbKey := getNdbClusterKey(ndb)
			klog.Infof("New NdbCluster resource added : %q, queueing it for reconciliation", ndbKey)
			controller.workqueue.Add(ndbKey)
		},

		UpdateFunc: func(old, new interface{}) {
			oldNdb := old.(*v1.NdbCluster)
			ndbKey := getNdbClusterKey(oldNdb)

			newNdb := new.(*v1.NdbCluster)
			if oldNdb.Generation != newNdb.Generation {
				// Spec of the NdbCluster resource was updated.
				klog.Infof("Spec of the NdbCluster resource %q was updated", ndbKey)
				klog.Infof("Generation updated from %d -> %d",
					oldNdb.Generation, newNdb.Generation)
				klog.Infof("Resource version updated from %s -> %s",
					oldNdb.ResourceVersion, newNdb.ResourceVersion)
				klog.Infof("NdbCluster resource %q is added to the queue for reconciliation", ndbKey)
			} else if oldNdb.ResourceVersion != newNdb.ResourceVersion {
				// Spec was not updated but the ResourceVersion changed => Status update
				klog.V(2).Infof("Status of the NdbCluster resource '%s' was updated", ndbKey)
				klog.V(2).Infof("Resource version updated from %s -> %s",
					oldNdb.ResourceVersion, newNdb.ResourceVersion)
				klog.V(2).Info("Nothing to do as only the status was updated.")
				return
			} else {
				// NdbCluster resource was not updated and this is a resync/requeue.
				if oldNdb.Generation != oldNdb.Status.ProcessedGeneration {
					// Reconciliation is already underway. Either it is being handled by a
					// worker right now or the controller is waiting for some resource to get
					// ready, and it will continue once it is ready.
					// So, no need to add the item to the workqueue now.
					return
				}
				klog.Infof("No updates to NdbCluster resource %q, queueing it for periodic reconciliation", ndbKey)
			}
			controller.workqueue.Add(ndbKey)
		},

		DeleteFunc: func(obj interface{}) {
			// Various K8s resources created and maintained for this NdbCluster
			// resource will have proper owner resources setup. Due to that, this
			// delete will automatically be cascaded to all those resources and
			// the controller doesn't have to do anything.
			ndb := obj.(*v1.NdbCluster)

			klog.Infof("NdbCluster resource '%s' was deleted", getNdbClusterKey(ndb))

			// List all pvcs owned by NdbCluster resource
			pvcs, listErr := controller.pvcLister.List(labels.Set(ndb.GetLabels()).AsSelector())
			if listErr != nil {
				klog.Errorf("Failed to list pvc's owned by NdbCluster %q : %s", getNamespacedName(ndb), listErr)
			}

			// Delete all pvc
			for _, pvc := range pvcs {
				err := controller.kubernetesClient.CoreV1().PersistentVolumeClaims(ndb.Namespace).Delete(context.Background(), pvc.Name, metav1.DeleteOptions{})
				if err != nil && !errors.IsNotFound(err) {
					// Delete failed with an error.
					klog.Errorf("Failed to delete pvc %q : %s", pvc, err)
				}
			}
		},
	})

	// Set up event handlers for StatefulSet resource changes
	statefulSetInformer.Informer().AddEventHandlerWithResyncPeriod(

		cache.FilteringResourceEventHandler{
			FilterFunc: func(obj interface{}) bool {
				// Filter out all StatefulSets not owned by any
				// NdbCluster resources. The StatefulSet labels
				// will have the names of their respective
				// NdbCluster owners.
				sfset := obj.(*appsv1.StatefulSet)
				_, clusterLabelExists := sfset.GetLabels()[constants.ClusterLabel]
				return clusterLabelExists
			},

			Handler: cache.ResourceEventHandlerFuncs{
				// When a StatefulSet owned by a NdbCluster resource
				// is updated, either
				//  a) it is ready, in which case, start the next
				//     reconciliation loop (or)
				//  b) one or more pods status have become ready/unready,
				//     in which case the status of the NdbCluster resource
				//     needs to be updated, which also happens through a
				//     reconciliation loop.
				//  So, add the NdbCluster item to the workqueue for both cases.
				UpdateFunc: func(oldObj, newObj interface{}) {
					oldStatefulSet := oldObj.(*appsv1.StatefulSet)
					newStatefulSet := newObj.(*appsv1.StatefulSet)

					if reflect.DeepEqual(oldStatefulSet.Status, newStatefulSet.Status) {
						// No updates to status => this event was triggered
						// by an update to the StatefulSet spec by the operator.
						// No need to enqueue for reconciliation.
						return
					}

					if oldStatefulSet.Spec.UpdateStrategy.Type == appsv1.OnDeleteStatefulSetStrategyType {
						oldStatus, newStatus := oldStatefulSet.Status, newStatefulSet.Status
						if (oldStatus.CurrentReplicas != newStatus.CurrentReplicas ||
							oldStatus.CurrentRevision != newStatus.CurrentRevision) &&
							oldStatus.UpdatedReplicas == newStatus.UpdatedReplicas &&
							oldStatus.UpdateRevision == newStatus.UpdateRevision &&
							oldStatus.ReadyReplicas == newStatus.ReadyReplicas &&
							oldStatus.Replicas == newStatus.Replicas &&
							oldStatus.ObservedGeneration == newStatus.ObservedGeneration {
							// https://github.com/kubernetes/kubernetes/issues/106055
							// CurrentRevision not updated if OnDelete update strategy is used.
							// But the CurrentReplicas get erroneously updated on some occasions.
							// Ignore any status updated made only to those fields if the
							// UpdateStrategy is OnDelete.
							return
						}
					}

					controller.extractAndEnqueueNdbCluster(newStatefulSet, "StatefulSet", "updated")
				},

				// When a statefulset owned by a NdbCluster resource
				// is deleted, add the NdbCluster resource to the
				// workqueue to start the next reconciliation loop.
				DeleteFunc: func(obj interface{}) {
					sfset := obj.(*appsv1.StatefulSet)
					klog.Infof("StatefulSet %q is deleted", getNamespacedName(sfset))
					controller.extractAndEnqueueNdbCluster(sfset, "StatefulSet", "deleted")
				},
			},
		},

		// Set resyncPeriod to 0 to ignore all re-sync events
		0,
	)

	// Set up event handlers for ConfigMap updates
	configmapInformer.Informer().AddEventHandlerWithResyncPeriod(
		cache.FilteringResourceEventHandler{
			FilterFunc: func(obj interface{}) bool {
				// Filter out all ConfigMaps not owned by any NdbCluster resources.
				// The ConfigMap labels will have the names of their respective
				// NdbCluster owners.
				configmap := obj.(*corev1.ConfigMap)
				_, clusterLabelExists := configmap.GetLabels()[constants.ClusterLabel]
				return clusterLabelExists
			},

			Handler: cache.ResourceEventHandlerFuncs{
				// A ConfigMap owned by an NdbCluster object was updated
				// Requeue owner for reconciliation
				UpdateFunc: func(oldObj, newObj interface{}) {
					newConfigMap := newObj.(*corev1.ConfigMap)
					controller.extractAndEnqueueNdbCluster(newConfigMap, "ConfigMap", "updated")
				},
			},
		},

		// Set resyncPeriod to 0 to ignore all re-sync events
		0,
	)

	// Set up event handlers for Pod Status changes
	podInformer.Informer().AddEventHandlerWithResyncPeriod(

		cache.FilteringResourceEventHandler{
			FilterFunc: func(obj interface{}) bool {
				// Filter out all Pods not owned by any NdbCluster resources.
				// The Pod labels will have the names of their respective
				// NdbCluster owners.
				pod := obj.(*corev1.Pod)
				_, clusterLabelExists := pod.GetLabels()[constants.ClusterLabel]
				return clusterLabelExists
			},

			Handler: cache.ResourceEventHandlerFuncs{
				// When a pod owned by an NdbCluster resource fails or
				// recovers from an error, the NdbCluster status needs to be updated.
				UpdateFunc: func(oldObj, newObj interface{}) {
					oldPod := oldObj.(*corev1.Pod)
					newPod := newObj.(*corev1.Pod)

					if !reflect.DeepEqual(getPodErrors(oldPod), getPodErrors(newPod)) {
						// The error status of the Pod has changed.
						controller.extractAndEnqueueNdbCluster(newPod, "Pod", "updated")
					}
				},
			},
		},

		// Set resyncPeriod to 0 to ignore all re-sync events
		0,
	)

	return controller
}