pkg/controller/console/console_controller.go (147 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package console import ( "context" "fmt" "reflect" "time" rocketmqv1alpha1 "github.com/apache/rocketmq-operator/pkg/apis/rocketmq/v1alpha1" cons "github.com/apache/rocketmq-operator/pkg/constants" "github.com/apache/rocketmq-operator/pkg/share" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/handler" logf "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" ) var log = logf.Log.WithName("controller_console") /** * USER ACTION REQUIRED: This is a scaffold file intended for the user to modify with their own Controller * business logic. Delete these comments after modifying this file.* */ // SetupWithManager creates a new Console Controller and adds it to the Manager. The Manager will set fields on the Controller // and Start it when the Manager is Started. func SetupWithManager(mgr manager.Manager) error { return add(mgr, newReconciler(mgr)) } // newReconciler returns a new reconcile.Reconciler func newReconciler(mgr manager.Manager) reconcile.Reconciler { return &ReconcileConsole{client: mgr.GetClient(), scheme: mgr.GetScheme()} } // add adds a new Controller to mgr with r as the reconcile.Reconciler func add(mgr manager.Manager, r reconcile.Reconciler) error { // Create a new controller c, err := controller.New("console-controller", mgr, controller.Options{Reconciler: r}) if err != nil { return err } // Watch for changes to primary resource Console err = c.Watch(&source.Kind{Type: &rocketmqv1alpha1.Console{}}, &handler.EnqueueRequestForObject{}) if err != nil { return err } // TODO(user): Modify this to be the types you create that are owned by the primary resource // Watch for changes to secondary resource Pods and requeue the owner Console err = c.Watch(&source.Kind{Type: &corev1.Pod{}}, &handler.EnqueueRequestForOwner{ IsController: true, OwnerType: &rocketmqv1alpha1.Console{}, }) if err != nil { return err } return nil } //+kubebuilder:rbac:groups=rocketmq.apache.org,resources=consoles,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups=rocketmq.apache.org,resources=consoles/status,verbs=get;update;patch //+kubebuilder:rbac:groups=rocketmq.apache.org,resources=consoles/finalizers,verbs=update //+kubebuilder:rbac:groups="apps",resources=deployments,verbs=get;list;watch;create;update;patch;delete // ReconcileConsole reconciles a Console object type ReconcileConsole struct { // This client, initialized using mgr.Client() above, is a split client // that reads objects from the cache and writes to the apiserver client client.Client scheme *runtime.Scheme } // Reconcile reads that state of the cluster for a Console object and makes changes based on the state read // and what is in the Console.Spec // TODO(user): Modify this Reconcile function to implement your Controller logic. This example creates // a Pod as an example // Note: // The Controller will requeue the Request to be processed again if the returned error is non-nil or // Result.Requeue is true, otherwise upon completion it will remove the work from the queue. func (r *ReconcileConsole) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) { reqLogger := log.WithValues("Request.Namespace", request.Namespace, "Request.Name", request.Name) reqLogger.Info("Reconciling Console") // Fetch the Console instance instance := &rocketmqv1alpha1.Console{} err := r.client.Get(context.TODO(), request.NamespacedName, instance) if err != nil { if errors.IsNotFound(err) { // Request object not found, could have been deleted after reconcile request. // Owned objects are automatically garbage collected. For additional cleanup logic use finalizers. // Return and don't requeue return reconcile.Result{}, nil } // Error reading the object - requeue the request. return reconcile.Result{}, err } if instance.Spec.NameServers == "" { // wait for name server ready if nameServers is omitted for { if share.IsNameServersStrInitialized { break } else { log.Info("Waiting for name server ready...") time.Sleep(time.Duration(cons.WaitForNameServerReadyInSecond) * time.Second) } } } else { share.NameServersStr = instance.Spec.NameServers } consoleDeployment := newDeploymentForCR(instance) // Set Console instance as the owner and controller if err := controllerutil.SetControllerReference(instance, consoleDeployment, r.scheme); err != nil { return reconcile.Result{}, err } // Check if this Pod already exists found := &appsv1.Deployment{} err = r.client.Get(context.TODO(), types.NamespacedName{Name: consoleDeployment.Name, Namespace: consoleDeployment.Namespace}, found) if err != nil && errors.IsNotFound(err) { reqLogger.Info("Creating RocketMQ Console Deployment", "Namespace", consoleDeployment, "Name", consoleDeployment.Name) err = r.client.Create(context.TODO(), consoleDeployment) if err != nil { return reconcile.Result{}, err } // created successfully - don't requeue return reconcile.Result{}, nil } else if err != nil { return reconcile.Result{}, err } // Support console deployment update if !reflect.DeepEqual(instance.Spec.ConsoleDeployment.Spec.Replicas, found.Spec.Replicas) || !reflect.DeepEqual(instance.Spec.ConsoleDeployment.Spec.Template.Spec.Containers[0].Resources, found.Spec.Template.Spec.Containers[0].Resources) { found.Spec.Replicas = instance.Spec.ConsoleDeployment.Spec.Replicas found.Spec.Template.Spec.Containers[0].Resources = instance.Spec.ConsoleDeployment.Spec.Template.Spec.Containers[0].Resources err = r.client.Update(context.TODO(), found) if err != nil { reqLogger.Error(err, "Failed to update console CR", "Namespace", found.Namespace, "Name", found.Name) } else { reqLogger.Info("Successfully updated console CR", "Namespace", found.Namespace, "Name", found.Name) } } // TODO: update console if name server address changes // CR already exists - don't requeue reqLogger.Info("Skip reconcile: RocketMQ Console Deployment already exists", "Namespace", found.Namespace, "Name", found.Name) return reconcile.Result{}, nil } // newDeploymentForCR returns a deployment pod with modifying the ENV func newDeploymentForCR(cr *rocketmqv1alpha1.Console) *appsv1.Deployment { env := corev1.EnvVar{ Name: "JAVA_OPTS", Value: fmt.Sprintf("-Drocketmq.namesrv.addr=%s -Dcom.rocketmq.sendMessageWithVIPChannel=false", share.NameServersStr), } dep := &appsv1.Deployment{ ObjectMeta: metav1.ObjectMeta{ Name: cr.Name, Namespace: cr.Namespace, }, Spec: appsv1.DeploymentSpec{ Replicas: cr.Spec.ConsoleDeployment.Spec.Replicas, Selector: &metav1.LabelSelector{ MatchLabels: cr.Spec.ConsoleDeployment.Spec.Selector.MatchLabels, }, Template: corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Labels: cr.Spec.ConsoleDeployment.Spec.Template.ObjectMeta.Labels, Annotations: cr.Spec.ConsoleDeployment.Spec.Template.ObjectMeta.Annotations, }, Spec: corev1.PodSpec{ ServiceAccountName: cr.Spec.ConsoleDeployment.Spec.Template.Spec.ServiceAccountName, Affinity: cr.Spec.ConsoleDeployment.Spec.Template.Spec.Affinity, ImagePullSecrets: cr.Spec.ConsoleDeployment.Spec.Template.Spec.ImagePullSecrets, Containers: []corev1.Container{{ Resources: cr.Spec.ConsoleDeployment.Spec.Template.Spec.Containers[0].Resources, Image: cr.Spec.ConsoleDeployment.Spec.Template.Spec.Containers[0].Image, Args: cr.Spec.ConsoleDeployment.Spec.Template.Spec.Containers[0].Args, Name: cr.Spec.ConsoleDeployment.Spec.Template.Spec.Containers[0].Name, ImagePullPolicy: cr.Spec.ConsoleDeployment.Spec.Template.Spec.Containers[0].ImagePullPolicy, Env: append(cr.Spec.ConsoleDeployment.Spec.Template.Spec.Containers[0].Env, env), Ports: cr.Spec.ConsoleDeployment.Spec.Template.Spec.Containers[0].Ports, VolumeMounts: cr.Spec.ConsoleDeployment.Spec.Template.Spec.Containers[0].VolumeMounts, }}, Volumes: cr.Spec.ConsoleDeployment.Spec.Template.Spec.Volumes, }, }, }, } return dep }