shardingsphere-operator/pkg/controllers/chaos_controller.go (351 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package controllers import ( "context" "fmt" "reflect" "github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/api/v1alpha1" "github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/kubernetes/chaosmesh" "github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/kubernetes/configmap" "github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/kubernetes/job" "github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/pressure" sschaos "github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/reconcile/chaos" "github.com/go-logr/logr" batchV1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" ) const ( ChaosControllerName = "chaos-controller" ChaosFinalizerName = "shardingsphere.apache.org/finalizer" ) // ChaosReconciler is a controller for the Chaos type ChaosReconciler struct { client.Client Scheme *runtime.Scheme Log logr.Logger Events record.EventRecorder ClientSet *clientset.Clientset Chaos chaosmesh.Chaos Job job.Job ExecCtrls []*ExecCtrl ConfigMap configmap.ConfigMap } // +kubebuilder:rbac:groups=shardingsphere.apache.org,resources=chaos,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=shardingsphere.apache.org,resources=chaos/status,verbs=get;update;patch // +kubebuilder:rbac:groups=shardingsphere.apache.org,resources=chaos/finalizers,verbs=update // +kubebuilder:rbac:groups=chaos-mesh.org,resources=podchaos,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=chaos-mesh.org,resources=stresschaos,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=chaos-mesh.org,resources=networkchaos,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=batch,resources=jobs,verbs=get;list;watch;create;update;patch;delete // Reconcile handles main function of this controller func (r *ChaosReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { logger := r.Log.WithValues(ChaosControllerName, req.NamespacedName) ssChaos, err := r.getRuntimeChaos(ctx, req.NamespacedName) if err != nil { return ctrl.Result{}, client.IgnoreNotFound(err) } logger.Info("start reconcile chaos") if ssChaos.ObjectMeta.DeletionTimestamp.IsZero() { if !controllerutil.ContainsFinalizer(ssChaos, ChaosFinalizerName) { controllerutil.AddFinalizer(ssChaos, ChaosFinalizerName) if err := r.Update(ctx, ssChaos); err != nil { return ctrl.Result{}, err } } } else if controllerutil.ContainsFinalizer(ssChaos, ChaosFinalizerName) { return r.finalize(ctx, ssChaos) } var errors []error if err := r.reconcileChaos(ctx, ssChaos); err != nil { errors = append(errors, err) logger.Error(err, "reconcile chaos error") } if err := r.reconcileStatus(ctx, ssChaos); err != nil { errors = append(errors, err) logger.Error(err, "failed to update status") } if len(errors) > 0 { return ctrl.Result{Requeue: true}, err } return ctrl.Result{RequeueAfter: defaultRequeueTime}, nil } func (r *ChaosReconciler) reconcileChaos(ctx context.Context, chaos *v1alpha1.Chaos) error { logger := r.Log.WithValues("reconcile chaos", fmt.Sprintf("%s/%s", chaos.Namespace, chaos.Name)) namespacedName := types.NamespacedName{ Namespace: chaos.Namespace, Name: chaos.Name, } if chaos.Spec.EmbedChaos.PodChaos != nil { if err := r.reconcilePodChaos(ctx, chaos, namespacedName); err != nil { logger.Error(err, "reconcile pod chaos error") return err } } if chaos.Spec.EmbedChaos.NetworkChaos != nil { if err := r.reconcileNetworkChaos(ctx, chaos, namespacedName); err != nil { logger.Error(err, "reconcile network chaos error") return err } } return nil } func (r *ChaosReconciler) reconcileStatus(ctx context.Context, chaos *v1alpha1.Chaos) error { cur := chaos.Status.DeepCopy() if err := r.updateChaosCondition(ctx, chaos); err != nil { return err } if reflect.DeepEqual(cur, chaos.Status) { return nil } return r.Status().Update(ctx, chaos) } func (r *ChaosReconciler) updateChaosCondition(ctx context.Context, chaos *v1alpha1.Chaos) error { namespacedName := types.NamespacedName{ Namespace: chaos.Namespace, Name: chaos.Name, } if chaos.Spec.EmbedChaos.PodChaos != nil { switch chaos.Spec.EmbedChaos.PodChaos.Action { case v1alpha1.CPUStress: fallthrough case v1alpha1.MemoryStress: sc, err := r.Chaos.GetStressChaosByNamespacedName(ctx, namespacedName) if err != nil { return err } chaos.Status.ChaosCondition = chaosmesh.ConvertChaosStatus(ctx, chaos, sc) case v1alpha1.PodFailure: fallthrough case v1alpha1.PodKill: fallthrough case v1alpha1.ContainerKill: pc, err := r.Chaos.GetPodChaosByNamespacedName(ctx, namespacedName) if err != nil { return err } chaos.Status.ChaosCondition = chaosmesh.ConvertChaosStatus(ctx, chaos, pc) } } if chaos.Spec.EmbedChaos.NetworkChaos != nil { nc, err := r.Chaos.GetNetworkChaosByNamespacedName(ctx, namespacedName) if err != nil { return err } chaos.Status.ChaosCondition = chaosmesh.ConvertChaosStatus(ctx, chaos, nc) } return nil } type ExecCtrl struct { cancel context.CancelFunc pressure *pressure.Pressure } func makeExecName(namespacedName types.NamespacedName, execType string) string { return fmt.Sprintf("%s-%s-%s", namespacedName.Namespace, namespacedName.Name, execType) } func (r *ChaosReconciler) getRuntimeChaos(ctx context.Context, name types.NamespacedName) (*v1alpha1.Chaos, error) { var rt = &v1alpha1.Chaos{} err := r.Get(ctx, name, rt) return rt, err } // nolint:nestif func (r *ChaosReconciler) finalize(ctx context.Context, ssChaos *v1alpha1.Chaos) (ctrl.Result, error) { namespacedName := types.NamespacedName{ Namespace: ssChaos.Namespace, Name: ssChaos.Name, } r.deleteExec(namespacedName) if err := r.deleteExternalResources(ctx, ssChaos); err != nil { return ctrl.Result{}, err } controllerutil.RemoveFinalizer(ssChaos, ChaosFinalizerName) if err := r.Update(ctx, ssChaos); err != nil { return ctrl.Result{}, err } return ctrl.Result{}, nil } func (r *ChaosReconciler) deleteExternalResources(ctx context.Context, chao *v1alpha1.Chaos) error { nameSpacedName := types.NamespacedName{Namespace: chao.Namespace, Name: chao.Name} if chao.Spec.EmbedChaos.PodChaos != nil { switch chao.Spec.EmbedChaos.PodChaos.Action { case v1alpha1.CPUStress: fallthrough case v1alpha1.MemoryStress: if err := r.deleteStressChaos(ctx, nameSpacedName); err != nil { return err } case v1alpha1.PodFailure: fallthrough case v1alpha1.PodKill: fallthrough case v1alpha1.ContainerKill: if err := r.deletePodChaos(ctx, nameSpacedName); err != nil { return err } } return nil } if chao.Spec.EmbedChaos.NetworkChaos != nil { if err := r.deleteNetworkChaos(ctx, nameSpacedName); err != nil { return err } return nil } return nil } func (r *ChaosReconciler) deleteExec(namespacedName types.NamespacedName) { steady, chaos := makeExecName(namespacedName, string(sschaos.InSteady)), makeExecName(namespacedName, string(sschaos.InChaos)) execR := make([]*ExecCtrl, 0, len(r.ExecCtrls)) for i := range r.ExecCtrls { exec := r.ExecCtrls[i].pressure if exec.Name == steady || exec.Name == chaos { r.ExecCtrls[i].cancel() continue } execR = append(execR, r.ExecCtrls[i]) } r.ExecCtrls = execR } func (r *ChaosReconciler) deletePodChaos(ctx context.Context, namespacedName types.NamespacedName) error { podchao, err := r.getPodChaosByNamespacedName(ctx, namespacedName) if err != nil { return err } if podchao != nil { if err := r.Chaos.DeletePodChaos(ctx, podchao); err != nil { return err } } return nil } func (r *ChaosReconciler) deleteNetworkChaos(ctx context.Context, namespacedName types.NamespacedName) error { networkchao, err := r.getNetworkChaosByNamespacedName(ctx, namespacedName) if err != nil { return err } if networkchao != nil { if err := r.Chaos.DeleteNetworkChaos(ctx, networkchao); err != nil { return err } } return nil } func (r *ChaosReconciler) deleteStressChaos(ctx context.Context, namespacedName types.NamespacedName) error { sc, err := r.getStressChaosByNamespacedName(ctx, namespacedName) if err != nil { return err } if sc != nil { if err := r.Chaos.DeleteStressChaos(ctx, sc); err != nil { return err } } return nil } func (r *ChaosReconciler) reconcilePodChaos(ctx context.Context, chaos *v1alpha1.Chaos, namespacedName types.NamespacedName) error { switch chaos.Spec.EmbedChaos.PodChaos.Action { case v1alpha1.PodFailure: fallthrough case v1alpha1.ContainerKill: fallthrough case v1alpha1.PodKill: pc, err := r.getPodChaosByNamespacedName(ctx, namespacedName) if err != nil { return err } if pc != nil { return r.updatePodChaos(ctx, chaos, pc) } return r.createPodChaos(ctx, chaos) case v1alpha1.CPUStress: fallthrough case v1alpha1.MemoryStress: sc, err := r.getStressChaosByNamespacedName(ctx, namespacedName) if err != nil { return err } if sc != nil { return r.updateStressChaos(ctx, chaos, sc) } return r.createStressChaos(ctx, chaos) } return nil } func (r *ChaosReconciler) getPodChaosByNamespacedName(ctx context.Context, namespacedName types.NamespacedName) (chaosmesh.PodChaos, error) { pc, err := r.Chaos.GetPodChaosByNamespacedName(ctx, namespacedName) if err != nil { return nil, err } return pc, nil } func (r *ChaosReconciler) createPodChaos(ctx context.Context, chaos *v1alpha1.Chaos) error { err := r.Chaos.CreatePodChaos(ctx, chaos) if err != nil { return err } r.Events.Event(chaos, "Normal", "Created", fmt.Sprintf("PodChaos %s", " is created successfully")) return nil } func (r *ChaosReconciler) updatePodChaos(ctx context.Context, chaos *v1alpha1.Chaos, podChaos chaosmesh.PodChaos) error { err := r.Chaos.UpdatePodChaos(ctx, podChaos, chaos) if err != nil { return err } return nil } func (r *ChaosReconciler) reconcileNetworkChaos(ctx context.Context, chaos *v1alpha1.Chaos, namespacedName types.NamespacedName) error { nc, err := r.getNetworkChaosByNamespacedName(ctx, namespacedName) if err != nil { return err } if nc != nil { return r.updateNetWorkChaos(ctx, chaos, nc) } return r.createNetworkChaos(ctx, chaos) } func (r *ChaosReconciler) updateNetWorkChaos(ctx context.Context, chaos *v1alpha1.Chaos, networkChaos chaosmesh.NetworkChaos) error { err := r.Chaos.UpdateNetworkChaos(ctx, networkChaos, chaos) if err != nil { return err } return nil } func (r *ChaosReconciler) createNetworkChaos(ctx context.Context, chaos *v1alpha1.Chaos) error { err := r.Chaos.CreateNetworkChaos(ctx, chaos) if err != nil { return err } r.Events.Event(chaos, "Normal", "created", fmt.Sprintf("NetworkChaos %s", " is created successfully")) return nil } func (r *ChaosReconciler) getNetworkChaosByNamespacedName(ctx context.Context, namespacedName types.NamespacedName) (chaosmesh.NetworkChaos, error) { nc, err := r.Chaos.GetNetworkChaosByNamespacedName(ctx, namespacedName) if err != nil { return nil, err } return nc, nil } func (r *ChaosReconciler) getStressChaosByNamespacedName(ctx context.Context, namespacedName types.NamespacedName) (chaosmesh.StressChaos, error) { pc, err := r.Chaos.GetStressChaosByNamespacedName(ctx, namespacedName) if err != nil { return nil, err } return pc, nil } func (r *ChaosReconciler) createStressChaos(ctx context.Context, chaos *v1alpha1.Chaos) error { err := r.Chaos.CreateStressChaos(ctx, chaos) if err != nil { return err } r.Events.Event(chaos, "Normal", "Created", fmt.Sprintf("StressChaos %s", " is created successfully")) return nil } func (r *ChaosReconciler) updateStressChaos(ctx context.Context, chaos *v1alpha1.Chaos, stress chaosmesh.StressChaos) error { err := r.Chaos.UpdateStressChaos(ctx, stress, chaos) if err != nil { return err } return nil } // SetupWithManager sets up the controller with the Manager. func (r *ChaosReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&v1alpha1.Chaos{}). Owns(&corev1.ConfigMap{}). Owns(&batchV1.Job{}). Complete(r) }