in pkg/controller/controller/dledger_controller.go [113:230]
func (r *ReconcileController) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
reqLogger := log.WithValues("Request.Namespace", request.Namespace, "Request.Name", request.Name)
reqLogger.Info("Reconciling Controller.")
// Fetch the Controller instance
controller := &rocketmqv1alpha1.Controller{}
err := r.client.Get(context.TODO(), request.NamespacedName, controller)
if err != nil {
if errors.IsNotFound(err) {
// Request object not found, could have been deleted after reconcile request.
// Owned objects are automatically garbage collected. For additional cleanup logic use finalizers.
// Return and don't requeue
reqLogger.Info("Controller resource not found. Ignoring since object must be deleted.")
return reconcile.Result{}, nil
}
// Error reading the object - requeue the request.
reqLogger.Error(err, "Failed to get Controller.")
return reconcile.Result{RequeueAfter: time.Duration(cons.RequeueIntervalInSecond) * time.Second}, err
}
//create headless svc
headlessSvc := &corev1.Service{}
err = r.client.Get(context.TODO(), types.NamespacedName{Name: tool.BuildHeadlessSvcResourceName(request.Name), Namespace: request.Namespace}, headlessSvc)
if err != nil {
if errors.IsNotFound(err) {
// create;
consoleSvc := r.generateHeadlessSvc(controller)
err = r.client.Create(context.TODO(), consoleSvc)
if err != nil {
reqLogger.Error(err, "Failed to create controller headless svc")
return reconcile.Result{}, err
} else {
reqLogger.Info("Successfully create controller headless svc")
}
} else {
return reconcile.Result{}, err
}
}
sts := r.getControllerStatefulSet(controller)
// Check if the statefulSet already exists, if not create a new one
found := &appsv1.StatefulSet{}
err = r.client.Get(context.TODO(), types.NamespacedName{Name: sts.Name, Namespace: sts.Namespace}, found)
if err != nil && errors.IsNotFound(err) {
reqLogger.Info("Creating a new Controller StatefulSet.", "StatefulSet.Namespace", sts.Namespace, "StatefulSet.Name", sts.Name)
err = r.client.Create(context.TODO(), sts)
if err != nil {
reqLogger.Error(err, "Failed to create new Controller StatefulSet", "StatefulSet.Namespace", sts.Namespace, "StatefulSet.Name", sts.Name)
}
} else if err != nil {
reqLogger.Error(err, "Failed to list Controller StatefulSet.")
}
// List the pods for this controller's statefulSet
podList := &corev1.PodList{}
labelSelector := labels.SelectorFromSet(labelsForController(controller.Name))
listOps := &client.ListOptions{
Namespace: controller.Namespace,
LabelSelector: labelSelector,
}
err = r.client.List(context.TODO(), podList, listOps)
if err != nil {
reqLogger.Error(err, "Failed to list pods.", "Controller.Namespace", controller.Namespace, "Controller.Name", controller.Name)
return reconcile.Result{}, err
}
podNames := getPodNames(podList.Items)
log.Info("controller.Status.Nodes length = " + strconv.Itoa(len(controller.Status.Nodes)))
log.Info("podNames length = " + strconv.Itoa(len(podNames)))
// Ensure every pod is in running phase
for _, pod := range podList.Items {
if !reflect.DeepEqual(pod.Status.Phase, corev1.PodRunning) {
log.Info("pod " + pod.Name + " phase is " + string(pod.Status.Phase) + ", wait for a moment...")
}
}
// Update status.Size if needed
if controller.Spec.Size != controller.Status.Size {
log.Info("controller.Status.Size = " + strconv.Itoa(controller.Status.Size))
log.Info("controller.Spec.Size = " + strconv.Itoa(controller.Spec.Size))
controller.Status.Size = controller.Spec.Size
err = r.client.Status().Update(context.TODO(), controller)
if err != nil {
reqLogger.Error(err, "Failed to update Controller Size status.")
}
}
// Update status.Nodes if needed
if !reflect.DeepEqual(podNames, controller.Status.Nodes) {
controller.Status.Nodes = podNames
err = r.client.Status().Update(context.TODO(), controller)
if err != nil {
reqLogger.Error(err, "Failed to update Controller Nodes status.")
}
}
//create svc
controllerSvc := &corev1.Service{}
controllerSvcName := tool.BuildSvcResourceName(request.Name)
err = r.client.Get(context.TODO(), types.NamespacedName{Name: controllerSvcName, Namespace: request.Namespace}, controllerSvc)
if err != nil {
if errors.IsNotFound(err) {
// create;
svcToCreate := r.generateSvc(controller)
err = r.client.Create(context.TODO(), svcToCreate)
if err != nil {
reqLogger.Error(err, "Failed to create controller svc")
return reconcile.Result{}, err
} else {
reqLogger.Info("Successfully create controller svc")
}
} else {
return reconcile.Result{}, err
}
}
share.ControllerAccessPoint = controllerSvcName + ":9878"
return reconcile.Result{Requeue: true, RequeueAfter: time.Duration(cons.RequeueIntervalInSecond) * time.Second}, nil
}