oracle/controllers/databasecontroller/database_controller.go (298 lines of code) (raw):

// Copyright 2021 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package databasecontroller import ( "context" "fmt" "sync" "github.com/go-logr/logr" corev1 "k8s.io/api/core/v1" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" commonv1alpha1 "github.com/GoogleCloudPlatform/elcarro-oracle-operator/common/api/v1alpha1" v1alpha1 "github.com/GoogleCloudPlatform/elcarro-oracle-operator/oracle/api/v1alpha1" "github.com/GoogleCloudPlatform/elcarro-oracle-operator/oracle/controllers" "github.com/GoogleCloudPlatform/elcarro-oracle-operator/oracle/controllers/instancecontroller" "github.com/GoogleCloudPlatform/elcarro-oracle-operator/oracle/pkg/agents/common/sql" "github.com/GoogleCloudPlatform/elcarro-oracle-operator/oracle/pkg/k8s" "github.com/GoogleCloudPlatform/elcarro-oracle-operator/oracle/pkg/util" ) // These variables allow to plug in mock objects for functional tests var ( skipLBCheckForTest = false CheckStatusInstanceFunc = controllers.CheckStatusInstanceFunc ) // DatabaseReconciler reconciles a Database object type DatabaseReconciler struct { client.Client Log logr.Logger Scheme *runtime.Scheme Recorder record.EventRecorder InstanceLocks *sync.Map DatabaseClientFactory controllers.DatabaseClientFactory } func (r *DatabaseReconciler) findPod(ctx context.Context, namespace, instName string) (*corev1.PodList, error) { // List the Pods matching the PodTemplate Labels var pods corev1.PodList if err := r.List(ctx, &pods, client.InNamespace(namespace), client.MatchingLabels{"instance": instName, "task-type": controllers.DatabaseTaskType}); err != nil { return nil, err } return &pods, nil } func findContainer(pod corev1.Pod, c string) (*corev1.Container, error) { for _, con := range pod.Spec.Containers { if con.Name == c { return &con, nil } } return nil, fmt.Errorf("failed to find a container %s in a pod: %v", c, pod) } // updateIsChangeApplied sets status.IsChangeApplied field to false if observedGeneration < generation, it sets it to true if changes are applied. func (r *DatabaseReconciler) updateIsChangeApplied(ctx context.Context, db *v1alpha1.Database) { if db.Status.ObservedGeneration < db.Generation { db.Status.IsChangeApplied = v1.ConditionFalse db.Status.ObservedGeneration = db.Generation r.Log.Info("change detected", "observedGeneration", db.Status.ObservedGeneration, "generation", db.Generation) } if db.Status.IsChangeApplied == v1.ConditionTrue { return } userUpdateDone := k8s.ConditionStatusEquals(k8s.FindCondition(db.Status.Conditions, k8s.UserReady), v1.ConditionTrue) if userUpdateDone { db.Status.IsChangeApplied = v1.ConditionTrue } r.Log.Info("change applied", "observedGeneration", db.Status.ObservedGeneration, "generation", db.Generation) } // +kubebuilder:rbac:groups=database.oracle.db.anthosapis.com,resources=databases,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=database.oracle.db.anthosapis.com,resources=databases/status,verbs=get;update;patch // +kubebuilder:rbac:groups=core,resources=services,verbs=list;watch;get;patch // +kubebuilder:rbac:groups="",resources=events,verbs=create;patch // +kubebuilder:rbac:groups="",resources=pods,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups="",resources=pods/status,verbs=get;update;patch // +kubebuilder:rbac:groups="",resources=pods/exec,verbs=get;list;create;update;patch // +kubebuilder:rbac:groups="",resources=pods/log,verbs=get;list // Reconcile is the main method that reconciles the Database resource. func (r *DatabaseReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { log := r.Log.WithValues("Database", req.NamespacedName) var db v1alpha1.Database if err := r.Get(ctx, req.NamespacedName, &db); err != nil { return ctrl.Result{}, client.IgnoreNotFound(err) } if !db.DeletionTimestamp.IsZero() { return r.ReconcileDatabaseDeletion(ctx, req, log) } return r.ReconcileDatabaseCreation(ctx, req, log) } func (r *DatabaseReconciler) ReconcileDatabaseDeletion(ctx context.Context, req ctrl.Request, log logr.Logger) (ctrl.Result, error) { log.Info("reconciling Database (PDB) deletion...") var db v1alpha1.Database if err := r.Get(ctx, req.NamespacedName, &db); err != nil { return ctrl.Result{}, client.IgnoreNotFound(err) } // Find an Instance resource that the Database belongs to. var inst v1alpha1.Instance if err := r.Get(ctx, types.NamespacedName{Namespace: req.Namespace, Name: db.Spec.Instance}, &inst); err != nil { return ctrl.Result{}, err } log.Info("deleting a database(PDB) from its parent Instance", "DatabaseName", db.Name, "InstanceName", inst.Name) if controllerutil.ContainsFinalizer(&db, controllers.FinalizerName) { if !instancecontroller.IsStopped(&inst) { deleteReq := controllers.DeleteDatabaseRequest{ Name: db.Spec.Name, DbDomain: controllers.GetDBDomain(&inst), } err := controllers.DeleteDatabase(ctx, r, r.DatabaseClientFactory, req.Namespace, inst.Name, deleteReq) if err != nil { return ctrl.Result{}, err } } // Remove PDB from list of DatabaseNames if util.Contains(inst.Status.DatabaseNames, db.Spec.Name) { inst.Status.DatabaseNames = util.Filter(inst.Status.DatabaseNames, db.Spec.Name) } if err := r.Status().Update(ctx, &inst); err != nil { log.Error(err, "failed to update the Instance status after deleting a Database(PDB)", "DatabaseName", db.Name, "InstanceName", inst.Name) return ctrl.Result{}, err } controllerutil.RemoveFinalizer(&db, controllers.FinalizerName) if err := r.Update(ctx, &db); err != nil { return ctrl.Result{}, err } } return ctrl.Result{}, nil } func (r *DatabaseReconciler) ReconcileDatabaseCreation(ctx context.Context, req ctrl.Request, log logr.Logger) (ctrl.Result, error) { log.Info("reconciling database") var db v1alpha1.Database if err := r.Get(ctx, req.NamespacedName, &db); err != nil { return ctrl.Result{}, client.IgnoreNotFound(err) } if err := validateSpec(&db); err != nil { return ctrl.Result{}, r.handlePreflightCheckError(ctx, &db, err) } // Find an Instance resource that the Database belongs to. var inst v1alpha1.Instance if err := r.Get(ctx, types.NamespacedName{Namespace: req.Namespace, Name: db.Spec.Instance}, &inst); err != nil { return ctrl.Result{}, r.handlePreflightCheckError(ctx, &db, fmt.Errorf("failed to find instance %q for database %q", db.Spec.Instance, db.Name)) } log.Info("using the following instance to create a new database(PDB)", "db.Spec.Instance", db.Spec.Instance, "inst", inst) DBDomain := controllers.GetDBDomain(&inst) // Find a pod running a database container. pods, err := r.findPod(ctx, req.Namespace, db.Spec.Instance) if err != nil { return ctrl.Result{}, r.handlePreflightCheckError(ctx, &db, fmt.Errorf("failed to find a pod")) } log.V(2).Info("found a pod", "pods", pods) if len(pods.Items) != 1 { return ctrl.Result{}, r.handlePreflightCheckError(ctx, &db, fmt.Errorf("expected 1 pod, found %d", len(pods.Items))) } // Find a database container within that pod. if _, err := findContainer(pods.Items[0], controllers.DatabaseContainerName); err != nil { log.Error(err, "reconciling database - failed to find a database container") return ctrl.Result{}, err } log.V(1).Info("a database container identified") // CDBName is specified in Instance specs cdbName := inst.Spec.CDBName istatus, err := CheckStatusInstanceFunc(ctx, r, r.DatabaseClientFactory, db.Spec.Instance, cdbName, inst.Namespace, "", DBDomain, log) if err != nil { log.Error(err, "preflight check failed", "check the database instance status", "failed") return ctrl.Result{}, err } if istatus != controllers.StatusReady { return ctrl.Result{}, r.handlePreflightCheckError(ctx, &db, fmt.Errorf("database instance doesn't appear to be ready yet")) } log.Info("preflight check: database instance is ready") // Confirm that an external LB is ready. lbSvc := &corev1.Service{} if err := r.Get(ctx, types.NamespacedName{Name: fmt.Sprintf(controllers.SvcName, db.Spec.Instance), Namespace: req.NamespacedName.Namespace}, lbSvc); err != nil { return ctrl.Result{}, err } if len(lbSvc.Status.LoadBalancer.Ingress) == 0 && !skipLBCheckForTest { return ctrl.Result{}, fmt.Errorf("preflight check: createDatabase: external LB is NOT ready") } log.Info("preflight check: createDatabase external LB service is ready", "svcName", lbSvc.Name) // Add finalizer to clean up the underlying PDB in case of deletion. if !controllerutil.ContainsFinalizer(&db, controllers.FinalizerName) { log.Info("adding a finalizer to the Database object.") controllerutil.AddFinalizer(&db, controllers.FinalizerName) if err := r.Update(ctx, &db); err != nil { return ctrl.Result{}, err } } alreadyExists, err := NewDatabase(ctx, r, &db, DBDomain, cdbName, log) if err != nil { return ctrl.Result{}, err } r.Recorder.Eventf(&db, corev1.EventTypeNormal, k8s.CreatedDatabase, fmt.Sprintf("Created new database %q", db.Spec.Name)) db.Status.Phase = commonv1alpha1.DatabaseReady db.Status.Conditions = k8s.Upsert(db.Status.Conditions, k8s.Ready, v1.ConditionTrue, k8s.CreateComplete, "") if err := r.Status().Update(ctx, &db); err != nil { return ctrl.Result{}, err } if alreadyExists { if err := SyncUsers(ctx, r, &db, cdbName, log); err != nil { log.Error(err, "failed to sync database") return ctrl.Result{}, err } return ctrl.Result{}, nil } log.V(1).Info("[DEBUG] create users", "Database", db.Spec.Name, "Users/Privs", db.Spec.Users) if err := NewUsers(ctx, r, &db, DBDomain, cdbName, log); err != nil { return ctrl.Result{}, err } // check DB name against existing ones to decide whether this is a new DB if !util.Contains(inst.Status.DatabaseNames, db.Spec.Name) { log.Info("found a new DB", "dbName", db.Spec.Name) inst.Status.DatabaseNames = append(inst.Status.DatabaseNames, db.Spec.Name) } else { log.V(1).Info("not a new DB, skipping the update", "dbName", db.Spec.Name) } log.Info("instance status", "conditions", inst.Status.Conditions, "endpoint", inst.Status.Endpoint, "url", inst.Status.URL, "databases", inst.Status.DatabaseNames) if err := r.Status().Update(ctx, &inst); err != nil { log.Error(err, "failed to update an Instance status") return ctrl.Result{}, err } log.Info("reconciling database: DONE") return ctrl.Result{}, nil } func (r *DatabaseReconciler) instanceToDatabases(obj client.Object) []ctrl.Request { var requests []ctrl.Request for _, name := range obj.(*v1alpha1.Instance).Status.DatabaseNames { requests = append(requests, reconcile.Request{ NamespacedName: types.NamespacedName{ Name: name, Namespace: obj.GetNamespace(), }}) } r.Log.Info("Instance event triggered reconcile ", "requests", requests) return requests } // SetupWithManager starts the reconciler loop. func (r *DatabaseReconciler) SetupWithManager(mgr ctrl.Manager) error { // UpdateFunc is used to judge if instance event is a 'DatabaseInstanceReady' event. If that is true, the event will be processed by the database reconciler databaseInstanceReadyPredicate := predicate.Funcs{ UpdateFunc: func(e event.UpdateEvent) bool { oldInstance, ok := e.ObjectOld.(*v1alpha1.Instance) if !ok { r.Log.Info("Expected instance", "type", e.ObjectOld.GetObjectKind().GroupVersionKind().String()) return false } if cond := k8s.FindCondition(oldInstance.Status.Conditions, k8s.DatabaseInstanceReady); k8s.ConditionStatusEquals(cond, v1.ConditionTrue) { return false } newInstance, ok := e.ObjectNew.(*v1alpha1.Instance) if !ok { r.Log.Info("Expected instance", "type", e.ObjectNew.GetObjectKind().GroupVersionKind().String()) return false } if cond := k8s.FindCondition(newInstance.Status.Conditions, k8s.DatabaseInstanceReady); !k8s.ConditionStatusEquals(cond, v1.ConditionTrue) { return false } r.Log.Info("DatabaseInstanceReady changes to true") return true }, CreateFunc: func(e event.CreateEvent) bool { return false }, DeleteFunc: func(e event.DeleteEvent) bool { return false }, GenericFunc: func(e event.GenericEvent) bool { return false }, } // We watch the instance event so we can trigger database creation when instance is ready. // Add a databaseInstanceReadyPredicate to avoid constantly triggering reconciliation for every instance event. return ctrl.NewControllerManagedBy(mgr). For(&v1alpha1.Database{}). Owns(&corev1.Service{}). Watches( &source.Kind{Type: &v1alpha1.Instance{}}, handler.EnqueueRequestsFromMapFunc(r.instanceToDatabases), builder.WithPredicates(databaseInstanceReadyPredicate), ). Complete(r) } func (r *DatabaseReconciler) handlePreflightCheckError(ctx context.Context, db *v1alpha1.Database, err error) error { r.Log.Error(err, "database preflightCheck failed") r.Recorder.Eventf(db, corev1.EventTypeWarning, k8s.CreatePending, err.Error()) db.Status.Phase = commonv1alpha1.DatabaseCreating db.Status.Conditions = k8s.Upsert(db.Status.Conditions, k8s.Ready, v1.ConditionFalse, k8s.CreatePending, err.Error()) if err := r.Status().Update(ctx, db); err != nil { r.Log.Error(err, "failed to update database status") } return err } // validateSpec validates the database spec. func validateSpec(db *v1alpha1.Database) error { // Currently only support validate db spec for user credentials. // no sensitive information is logged underlying. if (db.Spec.AdminPassword != "") && (db.Spec.AdminPasswordGsmSecretRef != nil) { return fmt.Errorf("resources/validateSpec: invalid database admin password spec; you can only specify either admin_password or adminPasswordGsmSecretRef") } for _, u := range db.Spec.Users { if (u.Password != "") && (u.GsmSecretRef != nil) { return fmt.Errorf("resources/validateSpec: invalid database user password spec for user %q; you can only specify either password or GsmSecretRef", u.Name) } } if _, err := sql.Identifier(db.Spec.Name); err != nil { return fmt.Errorf("resources/validateSpec: pdb name is not valid: %w", err) } if db.Spec.AdminPassword != "" { if _, err := sql.Identifier(db.Spec.AdminPassword); err != nil { return fmt.Errorf("resources/validateSpec: admin_password is not valid: %w", err) } } for _, u := range db.Spec.Users { if _, err := sql.ObjectName(u.Name); err != nil { return fmt.Errorf("resources/validateSpec: invalid user %q: %w", u.Name, err) } if u.Password != "" { if _, err := sql.Identifier(u.Password); err != nil { return fmt.Errorf("resources/validateSpec: password for user %q is not valid: %w", u.Name, err) } } for _, privilege := range u.Privileges { if !sql.IsPrivilege(string(privilege)) { return fmt.Errorf("resources/validateSpec: invalid privilege %q for user %q", privilege, u.Name) } } } return nil }