in pkg/controllers/controller.go [69:330]
func NewController(
kubernetesClient kubernetes.Interface,
ndbClient ndbclientset.Interface,
k8sSharedIndexInformer kubeinformers.SharedInformerFactory,
ndbSharedIndexInformer ndbinformers.SharedInformerFactory) *Controller {
// Register for all the required informers
ndbClusterInformer := ndbSharedIndexInformer.Mysql().V1().NdbClusters()
statefulSetInformer := k8sSharedIndexInformer.Apps().V1().StatefulSets()
podInformer := k8sSharedIndexInformer.Core().V1().Pods()
serviceInformer := k8sSharedIndexInformer.Core().V1().Services()
configmapInformer := k8sSharedIndexInformer.Core().V1().ConfigMaps()
secretInformer := k8sSharedIndexInformer.Core().V1().Secrets()
serviceAccountInformer := k8sSharedIndexInformer.Core().V1().ServiceAccounts()
pvcInformer := k8sSharedIndexInformer.Core().V1().PersistentVolumeClaims()
// Extract all the InformerSynced methods
informerSyncedMethods := []cache.InformerSynced{
ndbClusterInformer.Informer().HasSynced,
statefulSetInformer.Informer().HasSynced,
podInformer.Informer().HasSynced,
serviceInformer.Informer().HasSynced,
configmapInformer.Informer().HasSynced,
secretInformer.Informer().HasSynced,
serviceAccountInformer.Informer().HasSynced,
pvcInformer.Informer().HasSynced,
}
serviceLister := serviceInformer.Lister()
statefulSetLister := statefulSetInformer.Lister()
configmapLister := configmapInformer.Lister()
secretLister := secretInformer.Lister()
serviceAccountLister := serviceAccountInformer.Lister()
controller := &Controller{
kubernetesClient: kubernetesClient,
ndbClient: ndbClient,
informerSyncedMethods: informerSyncedMethods,
ndbsLister: ndbClusterInformer.Lister(),
podLister: podInformer.Lister(),
pvcLister: pvcInformer.Lister(),
configMapController: NewConfigMapControl(kubernetesClient, configmapLister),
serviceController: NewServiceControl(kubernetesClient, serviceLister),
serviceAccountController: NewServiceAccountControl(kubernetesClient, serviceAccountLister),
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Ndbs"),
recorder: newEventRecorder(kubernetesClient),
mgmdController: newMgmdStatefulSetController(kubernetesClient, statefulSetLister),
ndbmtdController: newNdbmtdStatefulSetController(kubernetesClient, statefulSetLister, secretLister),
mysqldController: newMySQLDStatefulSetController(
kubernetesClient, statefulSetLister, configmapLister),
}
// Setup informer and controller for v1.PDB if K8s Server has the support
if ServerSupportsV1Policy(kubernetesClient) {
pdbInformer := k8sSharedIndexInformer.Policy().V1().PodDisruptionBudgets()
controller.informerSyncedMethods = append(controller.informerSyncedMethods, pdbInformer.Informer().HasSynced)
controller.pdbController = newPodDisruptionBudgetControl(kubernetesClient, pdbInformer.Lister())
}
klog.Info("Setting up event handlers")
// Set up event handler for NdbCluster resource changes
ndbClusterInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
ndb := obj.(*v1.NdbCluster)
ndbKey := getNdbClusterKey(ndb)
klog.Infof("New NdbCluster resource added : %q, queueing it for reconciliation", ndbKey)
controller.workqueue.Add(ndbKey)
},
UpdateFunc: func(old, new interface{}) {
oldNdb := old.(*v1.NdbCluster)
ndbKey := getNdbClusterKey(oldNdb)
newNdb := new.(*v1.NdbCluster)
if oldNdb.Generation != newNdb.Generation {
// Spec of the NdbCluster resource was updated.
klog.Infof("Spec of the NdbCluster resource %q was updated", ndbKey)
klog.Infof("Generation updated from %d -> %d",
oldNdb.Generation, newNdb.Generation)
klog.Infof("Resource version updated from %s -> %s",
oldNdb.ResourceVersion, newNdb.ResourceVersion)
klog.Infof("NdbCluster resource %q is added to the queue for reconciliation", ndbKey)
} else if oldNdb.ResourceVersion != newNdb.ResourceVersion {
// Spec was not updated but the ResourceVersion changed => Status update
klog.V(2).Infof("Status of the NdbCluster resource '%s' was updated", ndbKey)
klog.V(2).Infof("Resource version updated from %s -> %s",
oldNdb.ResourceVersion, newNdb.ResourceVersion)
klog.V(2).Info("Nothing to do as only the status was updated.")
return
} else {
// NdbCluster resource was not updated and this is a resync/requeue.
if oldNdb.Generation != oldNdb.Status.ProcessedGeneration {
// Reconciliation is already underway. Either it is being handled by a
// worker right now or the controller is waiting for some resource to get
// ready, and it will continue once it is ready.
// So, no need to add the item to the workqueue now.
return
}
klog.Infof("No updates to NdbCluster resource %q, queueing it for periodic reconciliation", ndbKey)
}
controller.workqueue.Add(ndbKey)
},
DeleteFunc: func(obj interface{}) {
// Various K8s resources created and maintained for this NdbCluster
// resource will have proper owner resources setup. Due to that, this
// delete will automatically be cascaded to all those resources and
// the controller doesn't have to do anything.
ndb := obj.(*v1.NdbCluster)
klog.Infof("NdbCluster resource '%s' was deleted", getNdbClusterKey(ndb))
// List all pvcs owned by NdbCluster resource
pvcs, listErr := controller.pvcLister.List(labels.Set(ndb.GetLabels()).AsSelector())
if listErr != nil {
klog.Errorf("Failed to list pvc's owned by NdbCluster %q : %s", getNamespacedName(ndb), listErr)
}
// Delete all pvc
for _, pvc := range pvcs {
err := controller.kubernetesClient.CoreV1().PersistentVolumeClaims(ndb.Namespace).Delete(context.Background(), pvc.Name, metav1.DeleteOptions{})
if err != nil && !errors.IsNotFound(err) {
// Delete failed with an error.
klog.Errorf("Failed to delete pvc %q : %s", pvc, err)
}
}
},
})
// Set up event handlers for StatefulSet resource changes
statefulSetInformer.Informer().AddEventHandlerWithResyncPeriod(
cache.FilteringResourceEventHandler{
FilterFunc: func(obj interface{}) bool {
// Filter out all StatefulSets not owned by any
// NdbCluster resources. The StatefulSet labels
// will have the names of their respective
// NdbCluster owners.
sfset := obj.(*appsv1.StatefulSet)
_, clusterLabelExists := sfset.GetLabels()[constants.ClusterLabel]
return clusterLabelExists
},
Handler: cache.ResourceEventHandlerFuncs{
// When a StatefulSet owned by a NdbCluster resource
// is updated, either
// a) it is ready, in which case, start the next
// reconciliation loop (or)
// b) one or more pods status have become ready/unready,
// in which case the status of the NdbCluster resource
// needs to be updated, which also happens through a
// reconciliation loop.
// So, add the NdbCluster item to the workqueue for both cases.
UpdateFunc: func(oldObj, newObj interface{}) {
oldStatefulSet := oldObj.(*appsv1.StatefulSet)
newStatefulSet := newObj.(*appsv1.StatefulSet)
if reflect.DeepEqual(oldStatefulSet.Status, newStatefulSet.Status) {
// No updates to status => this event was triggered
// by an update to the StatefulSet spec by the operator.
// No need to enqueue for reconciliation.
return
}
if oldStatefulSet.Spec.UpdateStrategy.Type == appsv1.OnDeleteStatefulSetStrategyType {
oldStatus, newStatus := oldStatefulSet.Status, newStatefulSet.Status
if (oldStatus.CurrentReplicas != newStatus.CurrentReplicas ||
oldStatus.CurrentRevision != newStatus.CurrentRevision) &&
oldStatus.UpdatedReplicas == newStatus.UpdatedReplicas &&
oldStatus.UpdateRevision == newStatus.UpdateRevision &&
oldStatus.ReadyReplicas == newStatus.ReadyReplicas &&
oldStatus.Replicas == newStatus.Replicas &&
oldStatus.ObservedGeneration == newStatus.ObservedGeneration {
// https://github.com/kubernetes/kubernetes/issues/106055
// CurrentRevision not updated if OnDelete update strategy is used.
// But the CurrentReplicas get erroneously updated on some occasions.
// Ignore any status updated made only to those fields if the
// UpdateStrategy is OnDelete.
return
}
}
controller.extractAndEnqueueNdbCluster(newStatefulSet, "StatefulSet", "updated")
},
// When a statefulset owned by a NdbCluster resource
// is deleted, add the NdbCluster resource to the
// workqueue to start the next reconciliation loop.
DeleteFunc: func(obj interface{}) {
sfset := obj.(*appsv1.StatefulSet)
klog.Infof("StatefulSet %q is deleted", getNamespacedName(sfset))
controller.extractAndEnqueueNdbCluster(sfset, "StatefulSet", "deleted")
},
},
},
// Set resyncPeriod to 0 to ignore all re-sync events
0,
)
// Set up event handlers for ConfigMap updates
configmapInformer.Informer().AddEventHandlerWithResyncPeriod(
cache.FilteringResourceEventHandler{
FilterFunc: func(obj interface{}) bool {
// Filter out all ConfigMaps not owned by any NdbCluster resources.
// The ConfigMap labels will have the names of their respective
// NdbCluster owners.
configmap := obj.(*corev1.ConfigMap)
_, clusterLabelExists := configmap.GetLabels()[constants.ClusterLabel]
return clusterLabelExists
},
Handler: cache.ResourceEventHandlerFuncs{
// A ConfigMap owned by an NdbCluster object was updated
// Requeue owner for reconciliation
UpdateFunc: func(oldObj, newObj interface{}) {
newConfigMap := newObj.(*corev1.ConfigMap)
controller.extractAndEnqueueNdbCluster(newConfigMap, "ConfigMap", "updated")
},
},
},
// Set resyncPeriod to 0 to ignore all re-sync events
0,
)
// Set up event handlers for Pod Status changes
podInformer.Informer().AddEventHandlerWithResyncPeriod(
cache.FilteringResourceEventHandler{
FilterFunc: func(obj interface{}) bool {
// Filter out all Pods not owned by any NdbCluster resources.
// The Pod labels will have the names of their respective
// NdbCluster owners.
pod := obj.(*corev1.Pod)
_, clusterLabelExists := pod.GetLabels()[constants.ClusterLabel]
return clusterLabelExists
},
Handler: cache.ResourceEventHandlerFuncs{
// When a pod owned by an NdbCluster resource fails or
// recovers from an error, the NdbCluster status needs to be updated.
UpdateFunc: func(oldObj, newObj interface{}) {
oldPod := oldObj.(*corev1.Pod)
newPod := newObj.(*corev1.Pod)
if !reflect.DeepEqual(getPodErrors(oldPod), getPodErrors(newPod)) {
// The error status of the Pod has changed.
controller.extractAndEnqueueNdbCluster(newPod, "Pod", "updated")
}
},
},
},
// Set resyncPeriod to 0 to ignore all re-sync events
0,
)
return controller
}