oracle/controllers/instancecontroller/utils.go (917 lines of code) (raw):

// Copyright 2021 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package instancecontroller import ( "context" "encoding/json" "errors" "fmt" "reflect" "strings" "time" commonv1alpha1 "github.com/GoogleCloudPlatform/elcarro-oracle-operator/common/api/v1alpha1" "github.com/GoogleCloudPlatform/elcarro-oracle-operator/common/pkg/utils" "github.com/GoogleCloudPlatform/elcarro-oracle-operator/oracle/api/v1alpha1" "github.com/GoogleCloudPlatform/elcarro-oracle-operator/oracle/controllers" "github.com/GoogleCloudPlatform/elcarro-oracle-operator/oracle/pkg/agents/common/sql" "github.com/GoogleCloudPlatform/elcarro-oracle-operator/oracle/pkg/agents/consts" dbdpb "github.com/GoogleCloudPlatform/elcarro-oracle-operator/oracle/pkg/agents/oracle" "github.com/GoogleCloudPlatform/elcarro-oracle-operator/oracle/pkg/agents/security" "github.com/GoogleCloudPlatform/elcarro-oracle-operator/oracle/pkg/k8s" "github.com/go-logr/logr" "github.com/google/go-cmp/cmp" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" storagev1 "k8s.io/api/storage/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/utils/pointer" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" ctrlutil "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" ) // loadConfig attempts to find a customer specific Operator config // if it's been provided. There should be at most one config. // If no config is provided by a customer, no errors are raised and // all defaults are assumed. func (r *InstanceReconciler) loadConfig(ctx context.Context, ns string) (*v1alpha1.Config, error) { var configs v1alpha1.ConfigList if err := r.List(ctx, &configs, client.InNamespace(ns)); err != nil { return nil, err } if len(configs.Items) == 0 { return nil, nil } if len(configs.Items) != 1 { return nil, fmt.Errorf("number of customer provided configs is not one: %d", len(configs.Items)) } return &configs.Items[0], nil } func (r *InstanceReconciler) updateProgressCondition(ctx context.Context, inst v1alpha1.Instance, ns, op string, log logr.Logger) bool { iReadyCond := k8s.FindCondition(inst.Status.Conditions, k8s.Ready) log.Info("updateProgressCondition", "operation", op, "iReadyCond", iReadyCond) progress, err := r.statusProgress(ctx, ns, fmt.Sprintf(controllers.StsName, inst.Name), log) if iReadyCond != nil && progress > 0 { k8s.InstanceUpsertCondition(&inst.Status, iReadyCond.Type, iReadyCond.Status, iReadyCond.Reason, fmt.Sprintf("%s: %d%%", op, progress)) log.Info("updateProgressCondition", "statusProgress", err) } return err == nil } // updateIsChangeApplied sets instance.Status.IsChangeApplied field to false if observedGeneration < generation, it sets it to true if changes are applied. // TODO: add logic to handle restore/recovery func (r *InstanceReconciler) updateIsChangeApplied(inst *v1alpha1.Instance, log logr.Logger) { if inst.Status.ObservedGeneration < inst.Generation { log.Info("change detected", "observedGeneration", inst.Status.ObservedGeneration, "generation", inst.Generation) inst.Status.IsChangeApplied = v1.ConditionFalse inst.Status.ObservedGeneration = inst.Generation } if inst.Status.IsChangeApplied == v1.ConditionTrue { return } parameterUpdateDone := inst.Spec.Parameters == nil || reflect.DeepEqual(inst.Status.CurrentParameters, inst.Spec.Parameters) if parameterUpdateDone { inst.Status.IsChangeApplied = v1.ConditionTrue } log.Info("change applied", "observedGeneration", inst.Status.ObservedGeneration, "generation", inst.Generation) } func (r *InstanceReconciler) createStatefulSet(ctx context.Context, inst *v1alpha1.Instance, sp controllers.StsParams, applyOpts []client.PatchOption, log logr.Logger) (ctrl.Result, error) { newPVCs, err := controllers.NewPVCs(sp) if err != nil { log.Error(err, "NewPVCs failed") return ctrl.Result{}, err } newPodTemplate := controllers.NewPodTemplate(sp, *inst) sts, err := controllers.NewSts(sp, newPVCs, newPodTemplate) if err != nil { log.Error(err, "failed to create a StatefulSet", "sts", sts) return ctrl.Result{}, err } log.Info("StatefulSet constructed", "sts", sts, "sts.Status", sts.Status, "inst.Status", inst.Status) baseSTS := &appsv1.StatefulSet{} sts.DeepCopyInto(baseSTS) if _, err := ctrl.CreateOrUpdate(ctx, r, baseSTS, func() error { sts.Spec.DeepCopyInto(&baseSTS.Spec) return nil }); err != nil { log.Error(err, "failed to create the StatefulSet", "sts.Status", sts.Status) return ctrl.Result{}, err } return ctrl.Result{}, nil } func (r *InstanceReconciler) removeMonitoringDeployment(ctx context.Context, inst *v1alpha1.Instance, log logr.Logger) (bool, error) { var monitor appsv1.Deployment if err := r.Get(ctx, client.ObjectKey{Namespace: inst.Namespace, Name: GetMonitoringDepName(inst.Name)}, &monitor); err == nil { if err := r.Delete(ctx, &monitor); err != nil { log.Error(err, "failed to delete monitoring deployment", "InstanceName", inst.Name, "MonitorDeployment", monitor.Name) return false, err } return true, nil } else if !apierrors.IsNotFound(err) { // retry on other errors. return false, err } return false, nil } func (r *InstanceReconciler) stopMonitoringDeployment(ctx context.Context, inst *v1alpha1.Instance, log logr.Logger) error { config, err := r.loadConfig(ctx, inst.Namespace) if err != nil { return err } images := CloneMap(r.Images) if err := r.overrideDefaultImages(config, images, inst, log); err != nil { return err } if err := r.createMonitoringDeployment(ctx, inst, controllers.StoppedReplicaCnt, images); err != nil { return err } return nil } func (r *InstanceReconciler) createMonitoringDeployment(ctx context.Context, inst *v1alpha1.Instance, replicas int32, images map[string]string) error { deployment := &appsv1.Deployment{ ObjectMeta: metav1.ObjectMeta{ Namespace: inst.Namespace, Name: GetMonitoringDepName(inst.GetName()), }, } if _, err := ctrl.CreateOrUpdate(ctx, r.Client, deployment, func() error { if err := ctrlutil.SetOwnerReference(deployment, inst, r.Scheme()); err != nil { return err } monitoringSecret, err := r.getMonitoringSecret(ctx, inst) if err != nil { return err } matchLabels := map[string]string{"instance": inst.Name, "task-type": controllers.MonitorTaskType} deployment.Spec = appsv1.DeploymentSpec{ Replicas: &replicas, // Must match the agent deployment. Selector: &metav1.LabelSelector{ MatchLabels: matchLabels, }, Strategy: appsv1.DeploymentStrategy{Type: appsv1.RollingUpdateDeploymentStrategyType}, Template: controllers.MonitoringPodTemplate(inst, monitoringSecret, images), } deployment.Spec.Template.Labels = matchLabels return nil }); err != nil { return err } return nil } func (r *InstanceReconciler) getMonitoringSecret(ctx context.Context, inst *v1alpha1.Instance) (*corev1.Secret, error) { deploymentName := GetMonitoringDepName(inst.Name) monitoringUserSecretName := fmt.Sprintf("%s-secret", deploymentName) monitoringSecret := &corev1.Secret{} if err := r.Get(ctx, client.ObjectKey{Namespace: inst.Namespace, Name: monitoringUserSecretName}, monitoringSecret); err != nil { return monitoringSecret, fmt.Errorf("Error getting monitoring secret: %v", err) } return monitoringSecret, nil } func (r *InstanceReconciler) reconcileMonitoring(ctx context.Context, inst *v1alpha1.Instance, log logr.Logger, images map[string]string) (ctrl.Result, error) { requeueDuration := 0 * time.Second deploymentName := GetMonitoringDepName(inst.Name) monitoringUserSecretName := fmt.Sprintf("%s-secret", deploymentName) monitoringUser := "gcsql$monitor" monitoringSecret := &corev1.Secret{ ObjectMeta: metav1.ObjectMeta{ Namespace: inst.Namespace, Name: monitoringUserSecretName, }, } if result, err := ctrl.CreateOrUpdate(ctx, r.Client, monitoringSecret, func() error { if err := ctrlutil.SetOwnerReference(monitoringSecret, inst, r.Scheme()); err != nil { return err } if monitoringSecret.Data == nil { monitoringSecret.Data = make(map[string][]byte) } if len(monitoringSecret.Data["username"]) == 0 { monitoringSecret.Data["username"] = []byte(monitoringUser) } if len(monitoringSecret.Data["password"]) == 0 { monitoringPass, _ := security.RandOraclePassword() monitoringSecret.Data["password"] = []byte(monitoringPass) } return nil }); err != nil { return ctrl.Result{}, fmt.Errorf("creating monitoring secret %s/%s: %w", monitoringSecret.Namespace, monitoringSecret.Name, err) } else if result != ctrlutil.OperationResultNone { // Wait until we are sure the secret is reconciled to create the user. return ctrl.Result{RequeueAfter: 30 * time.Second}, nil } dbdClient, closeConn, err := r.DatabaseClientFactory.New(ctx, r, inst.GetNamespace(), inst.Name) if err != nil { return ctrl.Result{}, err } defer closeConn() // Only if user doesnt exist. // Create cdb user with access to all pdb. resp, err := dbdClient.RunSQLPlusFormatted(ctx, &dbdpb.RunSQLPlusCMDRequest{ Commands: []string{fmt.Sprintf("select username from dba_users where username='%s'", strings.ToUpper(monitoringUser))}, }) if err == nil && len(resp.GetMsg()) < 1 { if _, err := dbdClient.RunSQLPlus(ctx, &dbdpb.RunSQLPlusCMDRequest{ Commands: []string{ fmt.Sprintf("create user %s identified by %s", monitoringUser, string(monitoringSecret.Data["password"])), fmt.Sprintf("grant %s to %s container=all", "connect, select any dictionary", monitoringUser), fmt.Sprintf("alter user %s set container_data=all container=current", monitoringUser), }, Suppress: true, }); err != nil { log.Error(err, "Creating the monitoring user failed") requeueDuration = 30 * time.Second } } else if err != nil { // Wait for the database to be available requeueDuration = 30 * time.Second } if err := r.createMonitoringDeployment(ctx, inst, controllers.DefaultReplicaCnt, images); err != nil { return ctrl.Result{}, err } return ctrl.Result{RequeueAfter: requeueDuration}, nil } func (r *InstanceReconciler) stopDBStatefulset(ctx context.Context, req ctrl.Request, log logr.Logger) (ctrl.Result, error) { var inst v1alpha1.Instance if err := r.Get(ctx, req.NamespacedName, &inst); err != nil { return ctrl.Result{}, client.IgnoreNotFound(err) } sts := &appsv1.StatefulSet{ ObjectMeta: metav1.ObjectMeta{ Name: GetSTSName(inst.Name), Namespace: inst.Namespace, }, } if err := r.Get(ctx, client.ObjectKeyFromObject(sts), sts); err != nil { return ctrl.Result{}, fmt.Errorf("failed to get statefulset: %v", err) } sts.Spec.Replicas = pointer.Int32(controllers.StoppedReplicaCnt) baseSTS := &appsv1.StatefulSet{} sts.DeepCopyInto(baseSTS) if _, err := ctrl.CreateOrUpdate(ctx, r, baseSTS, func() error { sts.Spec.DeepCopyInto(&baseSTS.Spec) return nil }); err != nil { log.Error(err, "failed to update the StatefulSet", "sts.Status", sts.Status) return ctrl.Result{}, err } return ctrl.Result{}, nil } func (r *InstanceReconciler) deleteAgentSVC(ctx context.Context, inst *v1alpha1.Instance, log logr.Logger) (ctrl.Result, error) { if err := r.Delete(ctx, AgentSVC(*inst)); err != nil && !apierrors.IsNotFound(err) { log.Error(err, "failed to delete agent svc", "InstanceName", inst.Name) return ctrl.Result{}, err } return ctrl.Result{}, nil } func (r *InstanceReconciler) deleteDBDSVC(ctx context.Context, inst *v1alpha1.Instance, log logr.Logger) (ctrl.Result, error) { if err := r.Delete(ctx, DbDaemonSVC(*inst)); err != nil && !apierrors.IsNotFound(err) { log.Error(err, "failed to delete dbdaemon svc", "InstanceName", inst.Name) return ctrl.Result{}, err } return ctrl.Result{}, nil } func (r *InstanceReconciler) deleteDBLoadBalancer(ctx context.Context, inst *v1alpha1.Instance, log logr.Logger) (ctrl.Result, error) { if err := r.Delete(ctx, InstanceLB(*inst)); err != nil && !apierrors.IsNotFound(err) { log.Error(err, "failed to delete load balancer", "InstanceName", inst.Name) return ctrl.Result{}, err } return ctrl.Result{}, nil } // CreateDBLoadBalancer returns the service for the database. func (r *InstanceReconciler) createDBLoadBalancer(ctx context.Context, inst *v1alpha1.Instance, applyOpts []client.PatchOption) (*corev1.Service, error) { sourceCidrRanges := []string{"0.0.0.0/0"} if len(inst.Spec.SourceCidrRanges) > 0 { sourceCidrRanges = inst.Spec.SourceCidrRanges } var svcAnnotations map[string]string lbType := corev1.ServiceTypeLoadBalancer svcNameFull := getSVCName(*inst) svcAnnotations = utils.LoadBalancerAnnotations(inst.Spec.DBLoadBalancerOptions) svc := &corev1.Service{ TypeMeta: metav1.TypeMeta{APIVersion: corev1.SchemeGroupVersion.String(), Kind: "Service"}, ObjectMeta: metav1.ObjectMeta{Name: svcNameFull, Namespace: inst.Namespace, Annotations: svcAnnotations}, Spec: corev1.ServiceSpec{ Selector: map[string]string{ "instance": inst.Name, "task-type": controllers.DatabaseTaskType, }, Ports: []corev1.ServicePort{ { Name: "secure-listener", Protocol: "TCP", Port: consts.SecureListenerPort, TargetPort: intstr.FromInt(consts.SecureListenerPort), }, { Name: "ssl-listener", Protocol: "TCP", Port: consts.SSLListenerPort, TargetPort: intstr.FromInt(consts.SSLListenerPort), }, }, Type: lbType, LoadBalancerIP: utils.LoadBalancerIpAddress(inst.Spec.DBLoadBalancerOptions), LoadBalancerSourceRanges: sourceCidrRanges, }, } // Set the Instance resource to own the Service resource. if err := ctrl.SetControllerReference(inst, svc, r.Scheme()); err != nil { return nil, err } if err := r.Patch(ctx, svc, client.Apply, applyOpts...); err != nil { return nil, err } return svc, nil } func (r *InstanceReconciler) createDataplaneServices(ctx context.Context, inst v1alpha1.Instance, applyOpts []client.PatchOption) (dbDaemonSvc *corev1.Service, agentSvc *corev1.Service, err error) { dbDaemonSvc, err = controllers.NewDBDaemonSvc(&inst, r.Scheme()) if err != nil { return nil, nil, err } if err := r.Patch(ctx, dbDaemonSvc, client.Apply, applyOpts...); err != nil { return nil, nil, err } agentSvc, err = controllers.NewAgentSvc(&inst, r.Scheme()) if err != nil { return nil, nil, err } else if agentSvc == nil { return dbDaemonSvc, nil, nil } if agentSvc.Spec.Ports == nil { return dbDaemonSvc, agentSvc, nil } if err := r.Patch(ctx, agentSvc, client.Apply, applyOpts...); err != nil { return nil, nil, err } return dbDaemonSvc, agentSvc, nil } // isImageSeeded determines from the service image metadata file if the image is seeded or unseeded. func (r *InstanceReconciler) isImageSeeded(ctx context.Context, inst *v1alpha1.Instance, log logr.Logger) (bool, error) { log.Info("isImageSeeded: requesting image metadata...", "instance", inst.GetName()) dbClient, closeConn, err := r.DatabaseClientFactory.New(ctx, r, inst.GetNamespace(), inst.GetName()) if err != nil { log.Error(err, "failed to create database client") return false, err } defer closeConn() serviceImageMetaData, err := dbClient.FetchServiceImageMetaData(ctx, &dbdpb.FetchServiceImageMetaDataRequest{}) if err != nil { return false, fmt.Errorf("isImageSeeded: failed on FetchServiceImageMetaData call: %v", err) } if !serviceImageMetaData.SeededImage { return false, nil } return true, nil } func (r *InstanceReconciler) overrideDefaultImages(config *v1alpha1.Config, images map[string]string, inst *v1alpha1.Instance, log logr.Logger) error { if config != nil { log.V(1).Info("customer config loaded", "config", config) if config.Spec.Platform != "GCP" && config.Spec.Platform != "BareMetal" && config.Spec.Platform != "Minikube" && config.Spec.Platform != "Kind" { return fmt.Errorf("Unsupported platform: %q", config.Spec.Platform) } // Replace the default images from the global Config, if so requested. log.Info("create instance: prep", "images explicitly requested for this config", config.Spec.Images) for k, image := range config.Spec.Images { log.Info("key value is", "k", k, "image", image) if v2, ok := images[k]; ok { log.Info("create instance: prep", "replacing", k, "image of", v2, "with global", image) images[k] = image } } } else { log.Info("no customer specific config found, assuming all defaults") } // Replace final images with those explicitly set for the Instance. if inst.Spec.Images != nil { log.Info("create instance: prep", "images explicitly requested for this instance", inst.Spec.Images) for k, v1 := range inst.Spec.Images { log.Info("k value is ", "key", k) if v2, ok := images[k]; ok { log.Info("create instance: prep", "replacing", k, "image of", v2, "with instance specific", v1) images[k] = v1 } } } serviceImageDefined := false if inst.Spec.Images != nil { if _, ok := inst.Spec.Images["service"]; ok { serviceImageDefined = true log.Info("service image requested via instance", "service image:", inst.Spec.Images["service"]) } } if config != nil { if _, ok := config.Spec.Images["service"]; ok { serviceImageDefined = true log.Info("service image requested via config", "service image:", config.Spec.Images["service"]) } } if inst.Spec.CDBName == "" { return fmt.Errorf("overrideDefaultImages: CDBName isn't defined in the config") } if !serviceImageDefined { return fmt.Errorf("overrideDefaultImages: Service image isn't defined in the config") } return nil } // validateSpec sanity checks a DB Domain input for conflicts. func validateSpec(inst *v1alpha1.Instance) error { // Does DBUniqueName contain DB Domain as a suffix? if strings.Contains(inst.Spec.DBUniqueName, ".") { domainFromName := strings.SplitN(inst.Spec.DBUniqueName, ".", 2)[1] if inst.Spec.DBDomain != "" && domainFromName != inst.Spec.DBDomain { return fmt.Errorf("validateSpec: domain %q provided in DBUniqueName %q does not match with provided DBDomain %q", domainFromName, inst.Spec.DBUniqueName, inst.Spec.DBDomain) } } if inst.Spec.CDBName != "" { if _, err := sql.Identifier(inst.Spec.CDBName); err != nil { return fmt.Errorf("validateSpec: cdbName is not valid: %w", err) } } return nil } // statusProgress tracks the progress of an ongoing instance creation and returns the progress in terms of percentage. func (r *InstanceReconciler) statusProgress(ctx context.Context, ns, name string, log logr.Logger) (int, error) { var sts appsv1.StatefulSetList if err := r.List(ctx, &sts, client.InNamespace(ns)); err != nil { log.Error(err, "failed to get a list of StatefulSets to check status") return 0, err } if len(sts.Items) < 1 { return 0, fmt.Errorf("failed to find a StatefulSet, found: %d", len(sts.Items)) } // In theory a user should not be running any StatefulSet in a // namespace, but to be on a safe side, iterate over all until we find ours. var foundSts *appsv1.StatefulSet for index, s := range sts.Items { if s.Name == name { foundSts = &sts.Items[index] } } if foundSts == nil { return 0, fmt.Errorf("failed to find the right StatefulSet %s (out of %d)", name, len(sts.Items)) } log.Info("found the right StatefulSet", "foundSts", &foundSts.Name, "sts.Status.CurrentReplicas", &foundSts.Status.CurrentReplicas, "sts.Status.ReadyReplicas", foundSts.Status.ReadyReplicas) if foundSts.Status.CurrentReplicas != 1 { return 10, fmt.Errorf("StatefulSet is not ready yet? (failed to find the expected number of current replicas): %d", foundSts.Status.CurrentReplicas) } if foundSts.Status.ReadyReplicas != 1 { return 50, fmt.Errorf("StatefulSet is not ready yet? (failed to find the expected number of ready replicas): %d", foundSts.Status.ReadyReplicas) } var pods corev1.PodList if err := r.List(ctx, &pods, client.InNamespace(ns), client.MatchingLabels{"statefulset": name}); err != nil { log.Error(err, "failed to get a list of Pods to check status") return 60, err } if len(pods.Items) < 1 { return 65, fmt.Errorf("failed to find enough pods, found: %d pods", len(pods.Items)) } var foundPod *corev1.Pod for index, p := range pods.Items { if p.Name == name+"-0" { foundPod = &pods.Items[index] } } if foundPod == nil { return 75, fmt.Errorf("failed to find the right Pod %s (out of %d)", name+"-0", len(pods.Items)) } log.Info("found the right Pod", "pod.Name", &foundPod.Name, "pod.Status", foundPod.Status.Phase, "#containers", len(foundPod.Status.ContainerStatuses)) if foundPod.Status.Phase != "Running" { return 85, fmt.Errorf("failed to find the right Pod %s in status Running: %s", name+"-0", foundPod.Status.Phase) } for _, podCondition := range foundPod.Status.Conditions { if podCondition.Type == "Ready" && podCondition.Status == "False" { log.Info("statusProgress: podCondition.Type ready is False") return 85, fmt.Errorf("failed to find the right Pod %s in status Running: %s", name+"-0", foundPod.Status.Phase) } if podCondition.Type == "ContainersReady" && podCondition.Status == "False" { msg := "statusProgress: podCondition.Type ContainersReady is False" log.Info(msg) return 85, fmt.Errorf(msg) } } for _, c := range foundPod.Status.ContainerStatuses { if !c.Ready { msg := fmt.Sprintf("container %s is not ready", c.Name) log.Info(msg) return 85, fmt.Errorf(msg) } msg := fmt.Sprintf("container %s is ready", c.Name) log.Info(msg) } for _, c := range foundPod.Status.InitContainerStatuses { if !c.Ready { msg := fmt.Sprintf("init container %s is not ready", c.Name) log.Info(msg) return 85, fmt.Errorf(msg) } msg := fmt.Sprintf("container %s is ready", c.Name) log.Info(msg) } log.Info("Stateful set creation is complete") return 100, nil } func IsPatchingStateMachineEntryCondition(enabledServices map[commonv1alpha1.Service]bool, activeImages map[string]string, spImages map[string]string, lastFailedImages map[string]string, instanceReadyCond *v1.Condition, dbInstanceCond *v1.Condition) bool { if !(enabledServices[commonv1alpha1.Patching] && instanceReadyCond != nil && dbInstanceCond != nil && k8s.ConditionStatusEquals(instanceReadyCond, v1.ConditionTrue)) { return false } if !reflect.DeepEqual(activeImages, spImages) && (lastFailedImages == nil || !reflect.DeepEqual(lastFailedImages, spImages)) { return true } return false } func (r *InstanceReconciler) isOracleUpAndRunning(ctx context.Context, inst *v1alpha1.Instance, namespace string, log logr.Logger) (bool, error) { status, err := CheckStatusInstanceFunc(ctx, r, r.DatabaseClientFactory, inst.Name, inst.Spec.CDBName, inst.Namespace, "", controllers.GetDBDomain(inst), log) if err != nil { log.Info("dbdaemon startup still in progress, waiting") return false, nil } if status != controllers.StatusReady { log.Info("Oracle startup still in progress, waiting") return false, nil } return true, nil } func (r *InstanceReconciler) updateDatabaseIncarnationStatus(ctx context.Context, inst *v1alpha1.Instance, log logr.Logger) error { incResp, err := controllers.FetchDatabaseIncarnation(ctx, r, r.DatabaseClientFactory, inst.Namespace, inst.Name) if err != nil { return fmt.Errorf("failed to fetch current database incarnation: %v", err) } if inst.Status.CurrentDatabaseIncarnation != incResp.Incarnation { inst.Status.LastDatabaseIncarnation = inst.Status.CurrentDatabaseIncarnation } inst.Status.CurrentDatabaseIncarnation = incResp.Incarnation return nil } func CloneMap(source map[string]string) map[string]string { clone := make(map[string]string, len(source)) for key, value := range source { clone[key] = value } return clone } // Initiate a config_agent_helpers ApplyDataPatch() call // Create an LRO job "DatabasePatch_%s", instance.GetUID() // making the method idempotent (per instance) // Return err on failure, nil on success func (r *InstanceReconciler) startDatabasePatching(req ctrl.Request, ctx context.Context, inst v1alpha1.Instance, log logr.Logger) error { log.Info("startDatabasePatching initiated") // Call async ApplyDataPatch log.Info("config_agent_helpers.ApplyDataPatch", "LRO", lroPatchingOperationID(inst)) resp, err := controllers.ApplyDataPatch(ctx, r, r.DatabaseClientFactory, inst.Namespace, inst.Name, controllers.ApplyDataPatchRequest{ LroInput: &controllers.LROInput{OperationId: lroPatchingOperationID(inst)}, }) if err != nil { return fmt.Errorf("failed on ApplyDataPatch gRPC call: %w", err) } log.Info("config_agent_helpers.ApplyDataPatch", "response", resp) return nil } // Check for patching LRO job status // Return (true, nil) if job is done // Return (false, nil) if job still in progress // Return (false, err) if the job failed func (r *InstanceReconciler) isDatabasePatchingDone(ctx context.Context, req ctrl.Request, inst v1alpha1.Instance, log logr.Logger) (bool, error) { // Get operation id id := lroPatchingOperationID(inst) operation, err := controllers.GetLROOperation(ctx, r.DatabaseClientFactory, r, id, req.Namespace, inst.Name) if err != nil { log.Info("GetLROOperation returned error", "error", err) return false, nil } log.Info("GetLROOperation", "response", operation) if !operation.Done { // Still waiting return false, nil } log.Info("LRO is DONE", "id", id) if err := controllers.DeleteLROOperation(ctx, r.DatabaseClientFactory, r, id, req.Namespace, inst.Name); err != nil { return false, fmt.Errorf("DeleteLROOperation returned an error: %w", err) } // remote LRO completed unsuccessfully if operation.GetError() != nil { return false, fmt.Errorf("config_agent.ApplyDataPatch() failed: %v", operation.GetError()) } return true, nil } func lroPatchingOperationID(instance v1alpha1.Instance) string { return fmt.Sprintf("DatabasePatch_%s", instance.GetUID()) } // AcquireInstanceMaintenanceLock gives caller an exclusive maintenance // access to the specified instance object. // 'inst' points to an existing instance object (will be updated after the call) // 'owner' identifies the owning controller e.g. 'instancecontroller' // Convention: // If the call succeeds the caller can safely assume // that it has exclusive access now. // If the call fails the caller needs to retry acquiring the lock. // // Function is idempotent, caller can acquire the lock multiple times. // // Note: The call will commit the instance object to k8s (with all changes), // updating the supplied 'inst' object and making all other // references stale. func AcquireInstanceMaintenanceLock(ctx context.Context, k8sClient client.Client, inst *v1alpha1.Instance, owner string) error { var result error = nil if inst.Status.LockedByController == "" { inst.Status.LockedByController = owner result = nil } else if inst.Status.LockedByController == owner { result = nil } else { result = fmt.Errorf("requested owner: %s, instance already locked by %v", owner, inst.Status.LockedByController) } // Will return an error if 'inst' is stale. if err := k8sClient.Status().Update(ctx, inst); err != nil { return fmt.Errorf("requested owner: %s, failed to update the instance status: %v", owner, err) } return result } // ReleaseInstanceMaintenanceLock releases exclusive maintenance // access to the specified instance object. // 'inst' points to an existing instance object (will be updated after the call) // 'owner' identifies the owning controller e.g. 'instancecontroller' // Convention: // If the call succeeds the caller can safely assume // that lock was released. // If the call fails the caller needs to retry releasing the lock. // // Call is idempotent, caller can release it multiple times. // If caller's not owning the lock the call will return success // without affecting the ownership. // // Note: The call will commit the instance object to k8s (with all changes), // updating the supplied 'inst' object and making all other // references stale. func ReleaseInstanceMaintenanceLock(ctx context.Context, k8sClient client.Client, inst *v1alpha1.Instance, owner string) error { var result error = nil if inst.Status.LockedByController == "" { result = nil } else if inst.Status.LockedByController == owner { inst.Status.LockedByController = "" result = nil } else { // Return success even if it's owned by someone else result = nil } // Will return an error if 'inst' is stale. if err := k8sClient.Status().Update(ctx, inst); err != nil { return fmt.Errorf("requested owner: %s, failed to update the instance status: %v", owner, err) } return result } func (r *InstanceReconciler) handleResize(ctx context.Context, inst *v1alpha1.Instance, instanceReadyCond *v1.Condition, dbInstanceCond *v1.Condition, sp controllers.StsParams, applyOpts []client.PatchOption, log logr.Logger) (ctrl.Result, error) { if !k8s.ConditionStatusEquals(instanceReadyCond, v1.ConditionTrue) && !k8s.ConditionStatusEquals(dbInstanceCond, v1.ConditionTrue) && !k8s.ConditionReasonEquals(instanceReadyCond, k8s.ResizingInProgress) { return ctrl.Result{}, nil } sts := &appsv1.StatefulSet{ ObjectMeta: metav1.ObjectMeta{ Name: sp.StsName, Namespace: inst.Namespace, }, } if err := r.Get(ctx, client.ObjectKeyFromObject(sts), sts); err != nil && !apierrors.IsNotFound(err) { return ctrl.Result{}, fmt.Errorf("failed to get statefulset: %v", err) } else if apierrors.IsNotFound(err) { log.Info("Recreating Stateful set") k8s.InstanceUpsertCondition(&inst.Status, k8s.Ready, v1.ConditionFalse, k8s.ResizingInProgress, "Recreating statefulset") if _, err := r.createStatefulSet(ctx, inst, sp, applyOpts, log); err != nil { return ctrl.Result{}, err } return ctrl.Result{RequeueAfter: 5 * time.Second}, nil } dbContainer := findContainer(sts.Spec.Template.Spec.Containers, controllers.DatabaseContainerName) if dbContainer == nil { return ctrl.Result{}, fmt.Errorf("could not find database container in pod template") } // CPU/Memory resize if !cmp.Equal(inst.Spec.DatabaseResources, dbContainer.Resources) { log.Info("Instance CPU/MEM resize required") k8s.InstanceUpsertCondition(&inst.Status, k8s.Ready, v1.ConditionFalse, k8s.ResizingInProgress, "Resizing cpu/memory") _, err := ctrl.CreateOrUpdate(ctx, r.Client, sts, func() error { dbContainer := findContainer(sts.Spec.Template.Spec.Containers, controllers.DatabaseContainerName) if dbContainer == nil { return fmt.Errorf("could not find database container in pod temmplate") } dbContainer.Resources = inst.Spec.DatabaseResources return nil }) if err != nil { return ctrl.Result{}, fmt.Errorf("failed to update statefulset resources: %v", err) } return ctrl.Result{RequeueAfter: 5 * time.Second}, nil } newSts, err := r.buildStatefulSet(ctx, inst, sp, nil, log) if err != nil { return ctrl.Result{}, err } done, err := tryResizeDisksOf(ctx, r.Client, newSts, log) if err != nil { return ctrl.Result{}, err } else if !done { k8s.InstanceUpsertCondition(&inst.Status, k8s.Ready, v1.ConditionFalse, k8s.ResizingInProgress, "Resizing disk") return ctrl.Result{RequeueAfter: 10 * time.Second}, nil } if k8s.ConditionReasonEquals(instanceReadyCond, k8s.ResizingInProgress) { ready, msg := IsReadyWithObj(sts) if ready && cmp.Equal(inst.Spec.DatabaseResources, dbContainer.Resources) { k8s.InstanceUpsertCondition(&inst.Status, k8s.Ready, v1.ConditionTrue, k8s.CreateComplete, msg) return ctrl.Result{Requeue: true}, nil } if err := utils.VerifyPodsStatus(ctx, r.Client, sts); errors.Is(err, utils.ErrPodUnschedulable) { return ctrl.Result{}, fmt.Errorf("Unschedulable pod %v", err) } else if errors.Is(err, utils.ErrNoResources) { return ctrl.Result{}, fmt.Errorf("Insufficient Resources %v", err) } return ctrl.Result{}, err } return ctrl.Result{}, nil } func tryResizeDisksOf(ctx context.Context, c client.Client, newSts *appsv1.StatefulSet, log logr.Logger) (bool, error) { oldSts := &appsv1.StatefulSet{} key := client.ObjectKeyFromObject(newSts) if err := c.Get(ctx, key, oldSts); err != nil { if apierrors.IsNotFound(err) { // no existing sts, we only delete sts after all PVCs has been resized, // so this is either a new sts or PVC has been resized. // Still return false here so that the request is requeued and sts is recreated in the following cycle.s return false, nil } return false, fmt.Errorf("error getting statefulset [%v]: %v", key, err) } if !oldSts.DeletionTimestamp.IsZero() { // sts has been requested to delete, let's wait for it to be completely gone return false, nil } changedDisks := FilterDiskWithSizeChanged( oldSts.Spec.VolumeClaimTemplates, newSts.Spec.VolumeClaimTemplates, log, ) if len(changedDisks) == 0 { // no disk has size changed return true, nil } // sanity check: all sc can expand if err := PvcsCanBeExpanded(ctx, c, newSts, changedDisks); err != nil { return false, fmt.Errorf("pvcs may not be expanded: %w", err) } // actually do the update done, err := resizePvcs(ctx, c, newSts, changedDisks, log) if err != nil { return false, err } if !done { return false, nil } // resize done, now let's delete the sts if err := c.Delete(ctx, oldSts); err != nil { return false, fmt.Errorf("error while deleting sts [%v]: %v", key, err) } return false, nil } // resizePvcs resizes the provided PVC templates, and return true if resize has // completed; false if it's done but PVC still undergoing resizing; or error if // and error occured during the resizing process func resizePvcs(ctx context.Context, c client.Client, sts *appsv1.StatefulSet, disks []*corev1.PersistentVolumeClaim, log logr.Logger, ) (bool, error) { done := true var requested []string var completed []string for i := 0; i < int(*sts.Spec.Replicas); i++ { for _, pvc := range disks { key := utils.ObjectKeyOf(sts, pvc, i) newSize := *pvc.Spec.Resources.Requests.Storage() pvc := &corev1.PersistentVolumeClaim{} if err := c.Get(ctx, key, pvc); err != nil { if apierrors.IsNotFound(err) { // ignore non-existence PVC since it will be created continue } else { return false, fmt.Errorf("error getting pvc [%v]: %v", key, err) } } if newSize.Equal(*pvc.Spec.Resources.Requests.Storage()) { // spec has been updated before, check status if !newSize.Equal(*pvc.Status.Capacity.Storage()) { // status is not equal, meaning the resizing is still in-progress done = false } else { completed = append(completed, key.String()) } } else { // spec is not updated yet, update it done = false oldCliObj := pvc.DeepCopyObject().(client.Object) pvc.Spec.Resources.Requests[corev1.ResourceStorage] = newSize if err := Patch(ctx, c, pvc, oldCliObj); err != nil { return false, fmt.Errorf("error resizing pvc [%v]: %v", key, err) } else { requested = append(requested, key.String()) } } } } if len(requested) != 0 || len(completed) != 0 { log.Info( fmt.Sprintf( "PVC resizing, requested [%v], completed [%v]", strings.Join(requested, ", "), strings.Join(completed, ", "))) } return done, nil } // IsReadyWithObj returns true if the statefulset has a non-zero number of // desired replicas and has the same number of actual pods running and ready. func IsReadyWithObj(sts *appsv1.StatefulSet) (ready bool, msg string) { var want int32 if sts.Spec.Replicas != nil { want = *sts.Spec.Replicas } have := sts.Status.ReadyReplicas if want == have { if want == 0 { return false, fmt.Sprintf("Statefulset %s/%s has zero replicas", sts.Namespace, sts.Name) } return true, fmt.Sprintf("Statefulset %s/%s is running", sts.Namespace, sts.Name) } return false, fmt.Sprintf("StatefulSet is not ready (current replicas: %d expected replicas: %d)", have, want) } func (r *InstanceReconciler) buildStatefulSet(ctx context.Context, inst *v1alpha1.Instance, sp controllers.StsParams, applyOpts []client.PatchOption, log logr.Logger) (*appsv1.StatefulSet, error) { newPVCs, err := controllers.NewPVCs(sp) if err != nil { log.Error(err, "NewPVCs failed") return nil, err } newPodTemplate := controllers.NewPodTemplate(sp, *inst) sts, err := controllers.NewSts(sp, newPVCs, newPodTemplate) if err != nil { log.Error(err, "failed to create a StatefulSet", "sts", sts) return nil, err } log.Info("StatefulSet constructed", "sts", sts, "sts.Status", sts.Status, "inst.Status", inst.Status) return sts, nil } func findContainer(containers []corev1.Container, name string) *corev1.Container { for i, c := range containers { if c.Name == name { return &containers[i] } } return nil } // FilterDiskWithSizeChanged compare an old STS to a new STS and identify volumes that changed from old to new. func FilterDiskWithSizeChanged(old, new []corev1.PersistentVolumeClaim, log logr.Logger) []*corev1.PersistentVolumeClaim { oldDisks := make(map[string]*resource.Quantity) changedDisks := make([]*corev1.PersistentVolumeClaim, 0, len(new)) for _, c := range old { oldDisks[c.GetName()] = c.Spec.Resources.Requests.Storage() } sb := strings.Builder{} sb.WriteString("Detected disks with new size: ") for i, c := range new { newSize := c.Spec.Resources.Requests.Storage() oldSize, ok := oldDisks[c.GetName()] if !ok { sb.WriteString(fmt.Sprintf("New disk [%v: %v], ", c.GetName(), newSize.String())) changedDisks = append(changedDisks, &new[i]) } else if !oldSize.Equal(*newSize) { sb.WriteString(fmt.Sprintf("[%v:%v->%v], ", c.GetName(), oldSize.String(), newSize.String())) changedDisks = append(changedDisks, &new[i]) } } if len(changedDisks) != 0 { log.Info(sb.String()) } return changedDisks } // pvcsCanBeExpanded checks all the pvcs has a storage class that can be expanded, and return an error if any one PVC // cannot be expanded. func PvcsCanBeExpanded(ctx context.Context, r client.Reader, sts *appsv1.StatefulSet, pvcs []*corev1.PersistentVolumeClaim, ) error { for i := 0; i < int(*sts.Spec.Replicas); i++ { for _, pvc := range pvcs { // Need to get the actual PVC from kubernetes since empty storage class while creating could mean either manually // provisioned or default storage class. key := utils.ObjectKeyOf(sts, pvc, i) if err := CheckSinglePvc(ctx, r, key); err != nil { return err } } } return nil } // check pvc spec func CheckSinglePvc(ctx context.Context, r client.Reader, key client.ObjectKey) error { tpvc := &corev1.PersistentVolumeClaim{} if err := r.Get(ctx, key, tpvc); err != nil { if apierrors.IsNotFound(err) { // non-existence PVC can be created at new size return nil } else { return fmt.Errorf("error getting pvc [%v]: %v", key, err) } } if tpvc.Spec.StorageClassName == nil || *tpvc.Spec.StorageClassName == "" { return fmt.Errorf("cannot resize manually provisioned pvc [%v]", key) } scName := *tpvc.Spec.StorageClassName sc := &storagev1.StorageClass{} if err := r.Get(ctx, client.ObjectKey{Name: scName}, sc); err != nil { return fmt.Errorf("error getting storageclass [%v]: %v", scName, err) } if sc.AllowVolumeExpansion == nil || !*sc.AllowVolumeExpansion { return fmt.Errorf("storageclass [%v] does not allow expansion for volume [%v]", scName, tpvc.GetName()) } return nil } // Patch attempts to patch the given object. func Patch(ctx context.Context, cli client.Client, newCliObj client.Object, oldCliObj client.Object) error { // Make sure we're comparing objects with the same resourceVersion so that // the patch data (i.e., the diff) doesn't include a resourceVersion. // Otherwise, we could get a "the object has been modified; please apply // your changes to the latest version and try again" error if the // resourceVersion we're using is out of date. oldCliObj.SetResourceVersion(newCliObj.GetResourceVersion()) patch := client.MergeFrom(oldCliObj) // The client.Patch function will make a patch request even if the patch // data is empty. So we'll check to see if a request is needed first to // avoid making empty requests. specOrMetaChanged, statusChanged, err := isObjectChanged(ctx, patch, newCliObj) if err != nil { return err } if specOrMetaChanged { newCloned := newCliObj if statusChanged { // Patch() will change the object passed in. We'll pass in a cloned // object so we still have the original for the status update below. newCloned = newCliObj.DeepCopyObject().(client.Object) } if err := cli.Patch(ctx, newCloned, patch); err != nil { return err } } if statusChanged { if err := cli.Status().Patch(ctx, newCliObj, patch); err != nil { return err } } // Replace h.old with the updated object, otherwise subsequent calls to // Patch() will be comparing the changes with an outdated version of the // object oldCliObj = newCliObj.DeepCopyObject().(client.Object) return nil } func isObjectChanged(ctx context.Context, patch client.Patch, obj client.Object) (specOrMetaChanged, statusChanged bool, err error) { data, err := patch.Data(obj) if err != nil { return false, false, err } var result map[string]interface{} if err := json.Unmarshal(data, &result); err != nil { return false, false, err } _, statusChanged = result["status"] specOrMetaChanged = len(result) > 0 && !(len(result) == 1 && statusChanged) return specOrMetaChanged, statusChanged, nil } func GetSTSName(instanceName string) string { return fmt.Sprintf(controllers.StsName, instanceName) } func GetMonitoringDepName(instName string) string { return fmt.Sprintf("%s-monitor", instName) } func getSVCName(instance v1alpha1.Instance) string { return fmt.Sprintf(controllers.SvcName, instance.Name) } func InstanceLB(inst v1alpha1.Instance) *corev1.Service { return &corev1.Service{ ObjectMeta: metav1.ObjectMeta{ Namespace: inst.GetNamespace(), Name: getSVCName(inst), }, } } func AgentSVC(inst v1alpha1.Instance) *corev1.Service { return &corev1.Service{ ObjectMeta: metav1.ObjectMeta{ Namespace: inst.GetNamespace(), Name: fmt.Sprintf(controllers.AgentSvcName, inst.Name), }, } } func DbDaemonSVC(inst v1alpha1.Instance) *corev1.Service { return &corev1.Service{ ObjectMeta: metav1.ObjectMeta{ Namespace: inst.GetNamespace(), Name: fmt.Sprintf(controllers.DbdaemonSvcName, inst.Name), }, } }