oracle/controllers/instancecontroller/instance_controller_standby.go (495 lines of code) (raw):

// Copyright 2021 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package instancecontroller import ( "context" "encoding/json" "errors" "fmt" "sort" "strings" "time" "github.com/go-logr/logr" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" commonv1alpha1 "github.com/GoogleCloudPlatform/elcarro-oracle-operator/common/api/v1alpha1" v1alpha1 "github.com/GoogleCloudPlatform/elcarro-oracle-operator/oracle/api/v1alpha1" "github.com/GoogleCloudPlatform/elcarro-oracle-operator/oracle/controllers" "github.com/GoogleCloudPlatform/elcarro-oracle-operator/oracle/pkg/k8s" ) const ( standbyErrorRetryInterval = time.Second * 60 // StandbyReconcileInterval is the reconcile interval for a standby instance. StandbyReconcileInterval = time.Second * 60 createStandby = "CreateStandby" ) func isStandbyDR(inst *v1alpha1.Instance) bool { if inst.Spec.ReplicationSettings != nil { return true } cond := k8s.FindCondition(inst.Status.Conditions, k8s.StandbyDRReady) return cond != nil && cond.Status != metav1.ConditionTrue } func (r *InstanceReconciler) standbyStateMachine(ctx context.Context, inst *v1alpha1.Instance, log logr.Logger) (ctrl.Result, error) { log.Info("Running standby DR state machine") // Our initial state is equivalent to StandbyVerifyFailed. See state machine. state := k8s.StandbyDRVerifyFailed if standbyCond := k8s.FindCondition(inst.Status.Conditions, k8s.StandbyDRReady); standbyCond != nil { state = standbyCond.Reason } switch state { case k8s.StandbyDRVerifyFailed: externalErrMsgs, err := r.verifySettings(ctx, inst) if err != nil { log.Error(err, "verify settings failed") r.updateStandbyDataReplicationStatus(ctx, inst, metav1.ConditionFalse, k8s.StandbyDRVerifyFailed, "validate replication settings failed", internalErrToMsg(err)) return ctrl.Result{RequeueAfter: standbyErrorRetryInterval}, nil } if len(externalErrMsgs) > 0 { r.updateStandbyDataReplicationStatus(ctx, inst, metav1.ConditionFalse, k8s.StandbyDRVerifyFailed, "validate replication settings failed", externalErrMsgs...) return ctrl.Result{RequeueAfter: standbyErrorRetryInterval}, nil } r.updateStandbyDataReplicationStatus(ctx, inst, metav1.ConditionFalse, k8s.StandbyDRVerifyCompleted, "validate replication settings completed") return ctrl.Result{Requeue: true}, nil case k8s.StandbyDRVerifyCompleted: inst.Status.CurrentReplicationSettings = inst.Spec.ReplicationSettings r.updateStandbyDataReplicationStatus(ctx, inst, metav1.ConditionFalse, k8s.StandbyDRCreateInProgress, "create standby instance in progress") return ctrl.Result{Requeue: true}, nil case k8s.StandbyDRCreateInProgress: if inst.Spec.ReplicationSettings == nil { r.updateStandbyDataReplicationStatus(ctx, inst, metav1.ConditionFalse, k8s.StandbyDRCreateInProgress, replicationSettingsNilErr(inst.Status.CurrentReplicationSettings)) return ctrl.Result{}, nil } inst.Status.CurrentReplicationSettings = inst.Spec.ReplicationSettings operationId := lroOperationID(createStandby, inst) credentialReq, err := toCredentialReq(inst.Spec.ReplicationSettings.PrimaryUser) if err != nil { r.updateStandbyDataReplicationStatus(ctx, inst, metav1.ConditionFalse, k8s.StandbyDRCreateInProgress, "create standby instance failed", internalErrToMsg(err)) return ctrl.Result{RequeueAfter: standbyErrorRetryInterval}, nil } operation, err := controllers.CreateStandby(ctx, r, r.DatabaseClientFactory, inst.Namespace, inst.Name, controllers.CreateStandbyRequest{ PrimaryHost: inst.Spec.ReplicationSettings.PrimaryHost, PrimaryPort: inst.Spec.ReplicationSettings.PrimaryPort, PrimaryService: inst.Spec.ReplicationSettings.PrimaryServiceName, PrimaryUser: inst.Spec.ReplicationSettings.PrimaryUser.Name, PrimaryCredential: credentialReq, BackupGcsPath: inst.Spec.ReplicationSettings.BackupURI, StandbyDbDomain: inst.Spec.DBDomain, StandbyDbUniqueName: inst.Spec.DBUniqueName, StandbyLogDiskSize: findLogDiskSize(inst), LroInput: &controllers.LROInput{OperationId: operationId}, }) if err != nil { r.updateStandbyDataReplicationStatus(ctx, inst, metav1.ConditionFalse, k8s.StandbyDRCreateFailed, "create standby instance failed", internalErrToMsg(err)) return ctrl.Result{}, nil } if operation.GetError() != nil { controllers.DeleteLROOperation(ctx, r.DatabaseClientFactory, r, operationId, inst.Namespace, inst.Name) r.updateStandbyDataReplicationStatus(ctx, inst, metav1.ConditionFalse, k8s.StandbyDRCreateFailed, "create standby instance failed", operation.GetError().GetMessage()) return ctrl.Result{}, nil } else if !operation.Done { log.Info("create standby still in progress") r.updateStandbyDataReplicationStatus(ctx, inst, metav1.ConditionFalse, k8s.StandbyDRCreateInProgress, "create standby instance in progress") return ctrl.Result{RequeueAfter: StandbyReconcileInterval}, nil } r.updateStandbyDataReplicationStatus(ctx, inst, metav1.ConditionFalse, k8s.StandbyDRCreateCompleted, "create standby instance completed") return ctrl.Result{}, nil case k8s.StandbyDRCreateFailed: return ctrl.Result{}, nil case k8s.StandbyDRCreateCompleted: if inst.Spec.ReplicationSettings == nil { r.updateStandbyDataReplicationStatus(ctx, inst, metav1.ConditionFalse, k8s.StandbyDRCreateCompleted, replicationSettingsNilErr(inst.Status.CurrentReplicationSettings)) return ctrl.Result{}, nil } inst.Status.CurrentReplicationSettings = inst.Spec.ReplicationSettings if err := r.reconcileDataGuard(ctx, inst); err != nil { r.updateStandbyDataReplicationStatus(ctx, inst, metav1.ConditionFalse, k8s.StandbyDRSetUpDataGuardFailed, "set up Data Guard failed", internalErrToMsg(err)) return ctrl.Result{RequeueAfter: standbyErrorRetryInterval}, nil } r.updateStandbyDataReplicationStatus(ctx, inst, metav1.ConditionFalse, k8s.StandbyDRSetUpDataGuardCompleted, "set up Data Guard completed") return ctrl.Result{Requeue: true}, nil case k8s.StandbyDRSetUpDataGuardFailed: if inst.Spec.ReplicationSettings == nil { r.updateStandbyDataReplicationStatus(ctx, inst, metav1.ConditionFalse, k8s.StandbyDRSetUpDataGuardFailed, replicationSettingsNilErr(inst.Status.CurrentReplicationSettings)) return ctrl.Result{}, nil } inst.Status.CurrentReplicationSettings = inst.Spec.ReplicationSettings if err := r.reconcileDataGuard(ctx, inst); err != nil { r.updateStandbyDataReplicationStatus(ctx, inst, metav1.ConditionFalse, k8s.StandbyDRSetUpDataGuardFailed, "set up Data Guard failed", internalErrToMsg(err)) return ctrl.Result{RequeueAfter: standbyErrorRetryInterval}, nil } r.updateStandbyDataReplicationStatus(ctx, inst, metav1.ConditionFalse, k8s.StandbyDRSetUpDataGuardCompleted, "set up Data Guard completed") return ctrl.Result{Requeue: true}, nil case k8s.StandbyDRSetUpDataGuardCompleted: r.updateStandbyDataReplicationStatus(ctx, inst, metav1.ConditionFalse, k8s.StandbyDRDataGuardReplicationInProgress, "Data Guard data replication in progress") return ctrl.Result{Requeue: true}, nil case k8s.StandbyDRDataGuardReplicationInProgress: if inst.Spec.ReplicationSettings == nil { if err := r.reconcilePromoteStandby(ctx, inst, log); err != nil { r.updateStandbyDataReplicationStatus(ctx, inst, metav1.ConditionFalse, k8s.StandbyDRPromoteFailed, "promote standby failed", internalErrToMsg(err)) return ctrl.Result{}, nil } r.updateStandbyDataReplicationStatus(ctx, inst, metav1.ConditionFalse, k8s.StandbyDRPromoteCompleted, "promote standby completed") return ctrl.Result{Requeue: true}, nil } if err := r.reconcileDataGuard(ctx, inst); err != nil { r.updateStandbyDataReplicationStatus(ctx, inst, metav1.ConditionFalse, k8s.StandbyDRDataGuardReplicationInProgress, "Data Guard data replication in progress with errors", internalErrToMsg(err)) r.updateDataGuardStatus(ctx, inst, standbyErrorRetryInterval, log) return ctrl.Result{RequeueAfter: standbyErrorRetryInterval}, nil } r.updateStandbyDataReplicationStatus(ctx, inst, metav1.ConditionFalse, k8s.StandbyDRDataGuardReplicationInProgress, "Data Guard data replication in progress") r.updateDataGuardStatus(ctx, inst, StandbyReconcileInterval, log) return ctrl.Result{RequeueAfter: StandbyReconcileInterval}, nil case k8s.StandbyDRPromoteFailed: if err := r.reconcilePromoteStandby(ctx, inst, log); err != nil { r.updateStandbyDataReplicationStatus(ctx, inst, metav1.ConditionFalse, k8s.StandbyDRPromoteFailed, "promote standby failed", internalErrToMsg(err)) return ctrl.Result{RequeueAfter: standbyErrorRetryInterval}, nil } r.updateStandbyDataReplicationStatus(ctx, inst, metav1.ConditionFalse, k8s.StandbyDRPromoteCompleted, "promote standby completed") return ctrl.Result{Requeue: true}, nil case k8s.StandbyDRPromoteCompleted, k8s.StandbyDRBootstrapFailed: inst.Status.CurrentReplicationSettings = nil inst.Status.DataGuardOutput = nil err := r.bootstrapStandby(ctx, inst) if err != nil { r.updateStandbyDataReplicationStatus(ctx, inst, metav1.ConditionFalse, k8s.StandbyDRBootstrapFailed, "bootstrap standby failed", internalErrToMsg(err)) return ctrl.Result{RequeueAfter: standbyErrorRetryInterval}, nil } r.updateStandbyDataReplicationStatus(ctx, inst, metav1.ConditionTrue, k8s.StandbyDRBootstrapCompleted, "bootstrap standby completed") k8s.InstanceUpsertCondition(&inst.Status, k8s.Ready, metav1.ConditionTrue, k8s.CreateComplete, "bootstrap standby completed") k8s.InstanceUpsertCondition(&inst.Status, k8s.DatabaseInstanceReady, metav1.ConditionTrue, k8s.CreateComplete, "bootstrap standby completed") return ctrl.Result{Requeue: true}, nil case k8s.StandbyDRBootstrapCompleted: r.updateStandbyDataReplicationStatus(ctx, inst, metav1.ConditionTrue, k8s.StandbyDRBootstrapCompleted, "bootstrap standby completed") k8s.InstanceUpsertCondition(&inst.Status, k8s.Ready, metav1.ConditionTrue, k8s.CreateComplete, "bootstrap standby completed") k8s.InstanceUpsertCondition(&inst.Status, k8s.DatabaseInstanceReady, metav1.ConditionTrue, k8s.CreateComplete, "bootstrap standby completed") return ctrl.Result{Requeue: true}, nil default: log.Info("standbyStateMachine: no action needed, proceed with main reconciliation", "unknown state", state) return ctrl.Result{}, nil } } func (r *InstanceReconciler) verifySettings(ctx context.Context, inst *v1alpha1.Instance) (externalErrMsgs []string, err error) { if inst.Spec.DBUniqueName == "" { externalErrMsgs = append(externalErrMsgs, "spec.dbUniqueName is required for standby replication, try adding spec.dbUniqueName in the instance Kubernetes manifest.") return externalErrMsgs, nil } if inst.Spec.CDBName == "" { externalErrMsgs = append(externalErrMsgs, "spec.cdbName is required for standby replication, try adding spec.cdbName in the instance Kubernetes manifest.") return externalErrMsgs, nil } if inst.Spec.Images == nil || inst.Spec.Images["service"] == "" { externalErrMsgs = append(externalErrMsgs, "spec.images.service is required for standby replication, try adding spec.images.service in the instance Kubernetes manifest.") return externalErrMsgs, nil } if inst.Spec.ReplicationSettings == nil { externalErrMsgs = append(externalErrMsgs, "spec.replicationSettings is required for standby replication, try adding spec.replicationSettings in the instance Kubernetes manifest.") return externalErrMsgs, nil } if inst.Spec.ReplicationSettings.PrimaryUser.GsmSecretRef == nil { externalErrMsgs = append(externalErrMsgs, "spec.replicationSettings.primaryCredential.gsmSecretRef is required for standby replication, "+ "try creating a secret to store password in Google Secret Manager and add corresponding spec.replicationSettings.primaryCredential.gsmSecretRef in the instance Kubernetes manifest.") return externalErrMsgs, nil } if inst.Spec.ReplicationSettings.PrimaryUser.Name != "sys" { externalErrMsgs = append(externalErrMsgs, "spec.replicationSettings.primaryUser.name must be sys for standby replication.") return externalErrMsgs, nil } credentialReq, err := toCredentialReq(inst.Spec.ReplicationSettings.PrimaryUser) if err != nil { return nil, err } resp, err := controllers.VerifyStandbySettings(ctx, r, r.DatabaseClientFactory, inst.Namespace, inst.Name, controllers.VerifyStandbySettingsRequest{ PrimaryHost: inst.Spec.ReplicationSettings.PrimaryHost, PrimaryPort: inst.Spec.ReplicationSettings.PrimaryPort, PrimaryService: inst.Spec.ReplicationSettings.PrimaryServiceName, PrimaryUser: inst.Spec.ReplicationSettings.PrimaryUser.Name, PrimaryCredential: credentialReq, StandbyDbUniqueName: inst.Spec.DBUniqueName, StandbyCdbName: inst.Spec.CDBName, BackupGcsPath: inst.Spec.ReplicationSettings.BackupURI, PasswordFileGcsPath: inst.Spec.ReplicationSettings.PasswordFileURI, StandbyVersion: inst.Spec.Version, }) if err != nil { return nil, err } settingErrs := resp.Errors if settingErrs != nil && len(settingErrs) > 0 { for _, settingErr := range settingErrs { externalErrMsgs = append(externalErrMsgs, fmt.Sprintf("%s: %s", settingErr.Type.String(), settingErr.Detail)) } } return externalErrMsgs, nil } func (r *InstanceReconciler) reconcileDataGuard(ctx context.Context, inst *v1alpha1.Instance) error { standbyHost, err := r.getStandbyHost(ctx, inst) if err != nil { return err } credentialReq, err := toCredentialReq(inst.Spec.ReplicationSettings.PrimaryUser) if err != nil { return err } if err := controllers.SetUpDataGuard(ctx, r, r.DatabaseClientFactory, inst.Namespace, inst.Name, controllers.SetUpDataGuardRequest{ PrimaryHost: inst.Spec.ReplicationSettings.PrimaryHost, PrimaryPort: inst.Spec.ReplicationSettings.PrimaryPort, PrimaryService: inst.Spec.ReplicationSettings.PrimaryServiceName, PrimaryUser: inst.Spec.ReplicationSettings.PrimaryUser.Name, PrimaryCredential: credentialReq, StandbyDbUniqueName: inst.Spec.DBUniqueName, StandbyHost: standbyHost, PasswordFileGcsPath: inst.Spec.ReplicationSettings.PasswordFileURI, }); err != nil { return err } inst.Status.CurrentReplicationSettings = inst.Spec.ReplicationSettings return nil } func (r *InstanceReconciler) updateDataGuardStatus(ctx context.Context, inst *v1alpha1.Instance, interval time.Duration, log logr.Logger) { // decide whether to update Data Guard output, // This is a workaround to avoid the reconciliation call again due to status update. if inst.Status.DataGuardOutput != nil && metav1.Now().Sub(inst.Status.DataGuardOutput.LastUpdateTime.Time) < interval { log.Info("skipped Data Guard update", "last update time", inst.Status.DataGuardOutput.LastUpdateTime) return } resp, err := controllers.DataGuardStatus(ctx, r, r.DatabaseClientFactory, inst.Namespace, inst.Name, controllers.DataGuardStatusRequest{StandbyDbUniqueName: inst.Spec.DBUniqueName}) if err == nil { inst.Status.DataGuardOutput = &v1alpha1.DataGuardOutput{ LastUpdateTime: metav1.Now(), StatusOutput: resp.Output, } } else { inst.Status.DataGuardOutput = &v1alpha1.DataGuardOutput{ LastUpdateTime: metav1.Now(), StatusOutput: []string{internalErrToMsg(err)}, } } } func (r *InstanceReconciler) bootstrapStandby(ctx context.Context, inst *v1alpha1.Instance) error { req := &controllers.BootstrapStandbyRequest{ CdbName: inst.Spec.CDBName, Version: inst.Spec.Version, Dbdomain: controllers.GetDBDomain(inst), } migratedPDBs, err := controllers.BootstrapStandby(ctx, r, r.DatabaseClientFactory, inst.GetNamespace(), inst.GetName(), *req) if err != nil { return fmt.Errorf("failed to bootstrap the standby instance: %v", err) } // Create missing resources for migrated database. for _, pdb := range migratedPDBs { var users []v1alpha1.UserSpec for _, u := range pdb.Users { var privs []v1alpha1.PrivilegeSpec for _, p := range u.Privs { privs = append(privs, v1alpha1.PrivilegeSpec(p)) } users = append(users, v1alpha1.UserSpec{ UserSpec: commonv1alpha1.UserSpec{ Name: u.UserName, }, Privileges: privs, }) } database := &v1alpha1.Database{ ObjectMeta: metav1.ObjectMeta{ Namespace: inst.GetNamespace(), Name: pdb.PdbName, }, Spec: v1alpha1.DatabaseSpec{ DatabaseSpec: commonv1alpha1.DatabaseSpec{ Name: pdb.PdbName, Instance: inst.GetName(), }, Users: users, }, } err := r.Client.Create(ctx, database) if apierrors.IsAlreadyExists(err) { if err := r.Client.Patch(ctx, database, client.Apply); err != nil { return fmt.Errorf("bootstrapStandby failed to patch database resource: %v", err) } } else if err != nil { return fmt.Errorf("bootstrapStandby failed to create database resource: %v", err) } } return nil } func (r *InstanceReconciler) updateStandbyDataReplicationStatus(ctx context.Context, inst *v1alpha1.Instance, cs metav1.ConditionStatus, nextState, msg string, errMsgs ...string) { if len(errMsgs) > 0 { sort.Strings(errMsgs) // TODO better message format. msg = fmt.Sprintf("%s\n%s", msg, strings.Join(errMsgs, "\n")) } k8s.InstanceUpsertCondition( &inst.Status, k8s.StandbyDRReady, cs, nextState, msg) } func (r *InstanceReconciler) reconcilePromoteStandby(ctx context.Context, inst *v1alpha1.Instance, log logr.Logger) error { if inst.Status.CurrentReplicationSettings == nil { log.Info("reconcilePromoteStandby: skipping as promote standby completed.") return nil } standbyHost, err := r.getStandbyHost(ctx, inst) if err != nil { return err } credentialReq, err := toCredentialReq(inst.Status.CurrentReplicationSettings.PrimaryUser) if err != nil { return err } if err := controllers.PromoteStandby(ctx, r, r.DatabaseClientFactory, inst.Namespace, inst.Name, controllers.PromoteStandbyRequest{ PrimaryHost: inst.Status.CurrentReplicationSettings.PrimaryHost, PrimaryPort: inst.Status.CurrentReplicationSettings.PrimaryPort, PrimaryService: inst.Status.CurrentReplicationSettings.PrimaryServiceName, PrimaryUser: inst.Status.CurrentReplicationSettings.PrimaryUser.Name, PrimaryCredential: credentialReq, StandbyDbUniqueName: inst.Spec.DBUniqueName, StandbyHost: standbyHost, }); err != nil { return err } return nil } func (r *InstanceReconciler) getStandbyHost(ctx context.Context, inst *v1alpha1.Instance) (string, error) { lbSvcName := fmt.Sprintf(controllers.SvcName, inst.Name) lbSvc := &corev1.Service{} if err := r.Get(ctx, types.NamespacedName{Name: lbSvcName, Namespace: inst.Namespace}, lbSvc); err != nil { return "", err } standbyHost := "" if len(lbSvc.Status.LoadBalancer.Ingress) > 0 { standbyHost = lbSvc.Status.LoadBalancer.Ingress[0].Hostname if standbyHost == "" { standbyHost = lbSvc.Status.LoadBalancer.Ingress[0].IP } } if standbyHost == "" { return "", fmt.Errorf("load balancer service %v not ready", lbSvc) } return standbyHost, nil } func internalErrToMsg(err error) string { // with this helper method, we can decide how to show code internal errors in status return fmt.Sprintf("Internal error: %v .", err) } func toCredentialReq(userSpec commonv1alpha1.UserSpec) (*controllers.Credential, error) { if userSpec.GsmSecretRef != nil { return &controllers.Credential{ Source: &controllers.CredentialGsmSecretReference{GsmSecretReference: &controllers.GsmSecretReference{ ProjectId: userSpec.GsmSecretRef.ProjectId, SecretId: userSpec.GsmSecretRef.SecretId, Version: userSpec.GsmSecretRef.Version, }}, }, nil } return nil, errors.New("failed to find a valid credential spec") } func findLogDiskSize(inst *v1alpha1.Instance) int64 { diskName := "LogDisk" if inst.Spec.Disks != nil { for _, d := range inst.Spec.Disks { if d.Name == diskName && !d.Size.IsZero() { return d.Size.Value() } } } defaultLogDiskSpec, _ := controllers.DefaultDiskSpecs[diskName] return defaultLogDiskSpec.Size.Value() } func replicationSettingsNilErr(settings *v1alpha1.ReplicationSettings) string { var s string if b, err := json.Marshal(settings); err == nil { s = string(b) } else { s = fmt.Sprintf("%+v", settings) } return fmt.Sprintf("spec.replicationSettings must be specified for a standby instance before promotion ready state. "+ "Try adding back spec.replicationSettings to the instance Kubernetes manifest. "+ "Last known replicationSettings: %s", s) }