oracle/controllers/pitrcontroller/pitr_controller.go (446 lines of code) (raw):

// Copyright 2022 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package pitrcontroller import ( "context" "errors" "fmt" "sort" "strconv" "strings" "time" "github.com/GoogleCloudPlatform/elcarro-oracle-operator/oracle/pkg/util" "github.com/go-logr/logr" "github.com/robfig/cron" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/utils/pointer" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" commonv1alpha1 "github.com/GoogleCloudPlatform/elcarro-oracle-operator/common/api/v1alpha1" "github.com/GoogleCloudPlatform/elcarro-oracle-operator/oracle/api/v1alpha1" "github.com/GoogleCloudPlatform/elcarro-oracle-operator/oracle/controllers" "github.com/GoogleCloudPlatform/elcarro-oracle-operator/oracle/pkg/agents/consts" pb "github.com/GoogleCloudPlatform/elcarro-oracle-operator/oracle/pkg/agents/pitr/proto" "github.com/GoogleCloudPlatform/elcarro-oracle-operator/oracle/pkg/k8s" ) const ( deploymentTemplate = "%s-pitr-agent-deployment" // PITRSvcTemplate is a string template for agent service names. PITRSvcTemplate = "%s-pitr-agent-svc" agentName = "pitr-agent" pitrCmd = "/pitr_agent" agentImageKey = "agent" // DefaultPITRAgentPort is PITR Agent's default port number. DefaultPITRAgentPort = 3204 ) var ( requeueInterval = 10 * time.Second ) type backupControl interface { List(ctx context.Context, opts ...client.ListOption) ([]v1alpha1.Backup, error) } type pitrControl interface { AvailableRecoveryWindows(ctx context.Context, p *v1alpha1.PITR) ([]*pb.Range, error) UpdateStatus(ctx context.Context, p *v1alpha1.PITR) error } // PITRReconciler reconciles a PITR object type PITRReconciler struct { client.Client Log logr.Logger Scheme *runtime.Scheme BackupCtrl backupControl PITRCtrl pitrControl } // +kubebuilder:rbac:groups=oracle.db.anthosapis.com,resources=pitrs,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=oracle.db.anthosapis.com,resources=pitrs/status,verbs=get;update;patch // +kubebuilder:rbac:groups=oracle.db.anthosapis.com,resources=instances,verbs=get;watch;list; // +kubebuilder:rbac:groups=core,resources=services,verbs=list;watch;get;patch;create // +kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch;create;update;patch;delete func (r *PITRReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { log := r.Log.WithValues("pitr", req.NamespacedName) log.Info("reconciling PITR requests") var p v1alpha1.PITR if err := r.Get(ctx, req.NamespacedName, &p); err != nil { return ctrl.Result{}, client.IgnoreNotFound(err) } validationErrs := validatePITRSpec(p.Spec) if len(validationErrs) != 0 { log.Error(errors.New("PITR spec is invalid"), strings.Join(validationErrs, "\n")) p.Status.Conditions = k8s.Upsert(p.Status.Conditions, k8s.Ready, metav1.ConditionFalse, k8s.CreatePending, strings.Join(validationErrs, "\n")) return ctrl.Result{}, r.PITRCtrl.UpdateStatus(ctx, &p) } var i v1alpha1.Instance if err := r.Get(ctx, types.NamespacedName{ Name: p.Spec.InstanceRef.Name, Namespace: p.GetNamespace(), }, &i); err != nil { return ctrl.Result{}, err } if err := r.ensureAgent(ctx, &p, &i); err != nil { return ctrl.Result{}, err } res, err := r.ensureBackup(ctx, &p, &i) if err != nil || !res.IsZero() { return res, err } if err = r.ensureBackupSchedule(ctx, &p, &i); err != nil { return ctrl.Result{}, err } if err = r.updateStatus(ctx, &p, &i, log); err != nil { return ctrl.Result{}, err } log.Info("reconciling PITR: DONE") return ctrl.Result{RequeueAfter: time.Minute}, nil } func (r *PITRReconciler) ensureBackupSchedule(ctx context.Context, p *v1alpha1.PITR, i *v1alpha1.Instance) error { applyOpts := []client.PatchOption{client.ForceOwnership, client.FieldOwner("pitr-controller")} if err := r.Patch(ctx, backupScheduleTemplate(p, i), client.Apply, applyOpts...); err != nil { return err } return nil } // Ensure at least one successful backup for each incarnation. func (r *PITRReconciler) ensureBackup(ctx context.Context, p *v1alpha1.PITR, i *v1alpha1.Instance) (ctrl.Result, error) { var backups v1alpha1.BackupList if err := r.List(ctx, &backups, client.InNamespace(i.GetNamespace()), client.MatchingLabels{controllers.PITRLabel: p.GetName(), controllers.IncarnationLabel: i.Status.CurrentDatabaseIncarnation}); err != nil { r.Log.Error(err, "failed to get a list of Backups applicable to pitr") return ctrl.Result{}, err } applyOpts := []client.PatchOption{client.ForceOwnership, client.FieldOwner("pitr-controller")} initialBackup := backupTemplate(p, i) initialBackup.ObjectMeta.Name = fmt.Sprintf("%s-incarnation-%s", initialBackup.ObjectMeta.Name, i.Status.CurrentDatabaseIncarnation) if len(backups.Items) == 0 { r.Log.Info("reconciling PITR ensureBackup: initial backup for current incarnation doesn't exist, creating...") if err := r.Patch(ctx, initialBackup, client.Apply, applyOpts...); err != nil { return ctrl.Result{}, err } return ctrl.Result{RequeueAfter: requeueInterval}, nil } else if len(backups.Items) == 1 { initialBackupReadyCond := k8s.FindCondition(backups.Items[0].Status.Conditions, k8s.Ready) switch initialBackupReadyCond.Reason { case k8s.BackupFailed: r.Log.Info("reconciling PITR ensureBackup: initial backup for current incarnation failed, deleting...") if err := r.Delete(ctx, initialBackup); err != nil { return ctrl.Result{}, err } return ctrl.Result{Requeue: true}, nil case k8s.BackupInProgress, k8s.BackupPending: r.Log.Info("reconciling PITR ensureBackup: initial backup for current incarnation in progress, waiting...") return ctrl.Result{RequeueAfter: requeueInterval}, nil } } var allBackups v1alpha1.BackupList if err := r.List(ctx, &allBackups, client.InNamespace(i.GetNamespace()), client.MatchingLabels{controllers.PITRLabel: p.GetName()}); err != nil { r.Log.Error(err, "failed to get a list of Backups applicable to pitr") return ctrl.Result{}, err } p.Status.BackupTotal = len(allBackups.Items) return ctrl.Result{}, r.Status().Update(ctx, p) } // calculateBackupRetentionCnt calculates number of backups to keep based on backup schedule and recover window func calculateBackupRetentionCnt(backupSchedule string, recoverWindow time.Duration) int32 { startTime := time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC) endTime := startTime.Add(recoverWindow) // backupSchedule is validated in validatePITRSpec() schedule, _ := cron.ParseStandard(backupSchedule) backupRetentionCnt := int32(0) for startTime.Before(endTime) { startTime = schedule.Next(startTime) backupRetentionCnt++ } backupRetentionCnt++ return backupRetentionCnt } func backupScheduleTemplate(p *v1alpha1.PITR, i *v1alpha1.Instance) *v1alpha1.BackupSchedule { backupSpec := backupTemplate(p, i).Spec // backup schedule defaults to every 4 hours backupSchedule := "0 */4 * * *" // recover window defaults to 7 days recoverWindow := time.Hour * 24 * 7 if p.Spec.BackupSchedule != "" { backupSchedule = p.Spec.BackupSchedule } backupRetentionCnt := calculateBackupRetentionCnt(backupSchedule, recoverWindow) PITRbackupSchedule := &v1alpha1.BackupSchedule{ TypeMeta: metav1.TypeMeta{ APIVersion: "oracle.db.anthosapis.com/v1alpha1", Kind: "BackupSchedule", }, ObjectMeta: metav1.ObjectMeta{ OwnerReferences: []metav1.OwnerReference{ { APIVersion: p.APIVersion, Kind: p.Kind, Name: p.Name, UID: p.UID, }, }, Namespace: i.GetNamespace(), Name: "pitr-backup-schedule", Labels: map[string]string{ controllers.PITRLabel: p.GetName(), }, }, Spec: v1alpha1.BackupScheduleSpec{ BackupScheduleSpec: commonv1alpha1.BackupScheduleSpec{ Schedule: backupSchedule, BackupRetentionPolicy: &commonv1alpha1.BackupRetentionPolicy{ BackupRetention: &backupRetentionCnt, }, }, BackupSpec: backupSpec, BackupLabels: map[string]string{ controllers.PITRLabel: p.GetName(), }, }, } return PITRbackupSchedule } func backupTemplate(p *v1alpha1.PITR, i *v1alpha1.Instance) *v1alpha1.Backup { PITRbackup := &v1alpha1.Backup{ TypeMeta: metav1.TypeMeta{ APIVersion: "oracle.db.anthosapis.com/v1alpha1", Kind: "Backup", }, ObjectMeta: metav1.ObjectMeta{ Name: "pitr-backup", Namespace: i.GetNamespace(), Labels: map[string]string{ controllers.PITRLabel: p.GetName(), }, }, Spec: v1alpha1.BackupSpec{ BackupSpec: commonv1alpha1.BackupSpec{ Instance: i.GetName(), // TODO: support PITR using snapshot backup Type: "Physical", }, Subtype: "Instance", GcsDir: p.Spec.StorageURI, }, } return PITRbackup } func (r *PITRReconciler) ensureAgent(ctx context.Context, p *v1alpha1.PITR, i *v1alpha1.Instance) error { // TODO better validation if p.Spec.Images == nil { return errors.New("PITR .spec.images must be specified") } agentImage, ok := p.Spec.Images[agentImageKey] if !ok { return fmt.Errorf("failed to find an required image from %v, want image with key %s", p.Spec.Images, agentImageKey) } options := []client.PatchOption{client.ForceOwnership, client.FieldOwner("pitr-controller")} pitrLabel := map[string]string{controllers.PITRLabel: p.GetName()} instlabels := map[string]string{"instance": i.GetName()} uid := controllers.DefaultUID if i.Spec.DatabaseUID != nil { uid = *i.Spec.DatabaseUID } gid := controllers.DefaultGID if i.Spec.DatabaseGID != nil { gid = *i.Spec.DatabaseGID } logDiskPVC, logDiskMount := controllers.GetPVCNameAndMount(i.GetName(), "LogDisk") // TODO better way to find the PVC logDiskPVC = fmt.Sprintf("%s-%s-sts-0", logDiskPVC, i.GetName()) deployName := fmt.Sprintf(deploymentTemplate, p.GetName()) // for now, PITR and DB instance are in the same namespace. deployNS := p.GetNamespace() dbdaemonSvc := &corev1.Service{} if err := r.Get(ctx, types.NamespacedName{Name: fmt.Sprintf(controllers.DbdaemonSvcName, i.GetName()), Namespace: i.GetNamespace()}, dbdaemonSvc); err != nil { return err } dbdaemonIP := dbdaemonSvc.Spec.ClusterIP dbdaemonPort := consts.DefaultDBDaemonPort svc := &corev1.Service{ TypeMeta: metav1.TypeMeta{APIVersion: corev1.SchemeGroupVersion.String(), Kind: "Service"}, ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf(PITRSvcTemplate, p.GetName()), Namespace: p.GetNamespace()}, Spec: corev1.ServiceSpec{ Selector: pitrLabel, Ports: []corev1.ServicePort{ { Name: "pitr", Protocol: "TCP", Port: DefaultPITRAgentPort, TargetPort: intstr.FromInt(DefaultPITRAgentPort), }, }, Type: corev1.ServiceTypeClusterIP, }, } deployment := &appsv1.Deployment{ TypeMeta: metav1.TypeMeta{APIVersion: "apps/v1", Kind: "Deployment"}, ObjectMeta: metav1.ObjectMeta{Name: deployName, Namespace: deployNS}, Spec: appsv1.DeploymentSpec{ Replicas: pointer.Int32(1), Selector: &metav1.LabelSelector{ MatchLabels: pitrLabel, }, Template: corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Labels: pitrLabel, Namespace: deployNS, }, Spec: corev1.PodSpec{ SecurityContext: &corev1.PodSecurityContext{ RunAsUser: pointer.Int64Ptr(uid), RunAsGroup: pointer.Int64Ptr(gid), FSGroup: pointer.Int64Ptr(gid), RunAsNonRoot: pointer.Bool(true), }, Volumes: []corev1.Volume{ { Name: "log", VolumeSource: corev1.VolumeSource{ PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ ClaimName: logDiskPVC, ReadOnly: true, }, }, }, }, Containers: []corev1.Container{ { Name: agentName, Image: agentImage, Command: []string{pitrCmd}, Args: []string{ "--dbservice=" + dbdaemonIP, "--dbport=" + strconv.Itoa(dbdaemonPort), "--dest=" + p.Spec.StorageURI, "--port=" + strconv.Itoa(DefaultPITRAgentPort), }, Ports: []corev1.ContainerPort{ {Name: "pitr-port", Protocol: "TCP", ContainerPort: DefaultPITRAgentPort}, }, SecurityContext: &corev1.SecurityContext{ AllowPrivilegeEscalation: pointer.BoolPtr(false), }, ImagePullPolicy: corev1.PullAlways, VolumeMounts: []corev1.VolumeMount{ { Name: "log", ReadOnly: true, MountPath: logDiskMount, }, }, }, }, // Add pod affinity for pitr agent pod, so that pitr agent pod can access DB disk. Affinity: &corev1.Affinity{ PodAffinity: &corev1.PodAffinity{ RequiredDuringSchedulingIgnoredDuringExecution: []corev1.PodAffinityTerm{ { LabelSelector: &metav1.LabelSelector{ MatchLabels: instlabels, }, Namespaces: []string{p.GetNamespace()}, TopologyKey: "kubernetes.io/hostname", }, }, }, }, }, }, }, } if err := ctrl.SetControllerReference(p, deployment, r.Scheme); err != nil { return err } if err := r.Patch(ctx, deployment, client.Apply, options...); err != nil { return err } if err := ctrl.SetControllerReference(p, svc, r.Scheme); err != nil { return err } if err := r.Patch(ctx, svc, client.Apply, options...); err != nil { return err } return nil } func (r *PITRReconciler) updateStatus(ctx context.Context, p *v1alpha1.PITR, i *v1alpha1.Instance, log logr.Logger) error { backups, err := r.BackupCtrl.List(ctx, client.InNamespace(i.GetNamespace()), client.MatchingLabels{controllers.PITRLabel: p.GetName(), controllers.IncarnationLabel: i.Status.CurrentDatabaseIncarnation}) if err != nil { log.Error(err, "failed to get a list of Backups applicable to pitr") return errors.New("failed to update available recovery window") } windows, err := r.PITRCtrl.AvailableRecoveryWindows(ctx, p) if err != nil { log.Error(err, "failed to get status from data plane") return errors.New("failed to update available recovery window") } timeToBackups := make(map[int64]v1alpha1.Backup) for _, b := range backups { if b.Status.Phase != commonv1alpha1.BackupSucceeded { continue } bTimestamp, err := time.Parse(time.RFC3339, b.Annotations[controllers.TimestampAnnotation]) if err != nil { log.Error(err, "failed to parse backup timestamp", "backup", b) continue } timeToBackups[bTimestamp.Unix()] = b } keys := make([]int64, len(timeToBackups)) idx := 0 for k := range timeToBackups { keys[idx] = k idx += 1 } sort.Slice(keys, func(i, j int) bool { return keys[i] < keys[j] }) var aWindowTime []v1alpha1.TimeWindow var aWindowSCN []v1alpha1.SCNWindow for _, w := range windows { firstIdx := sort.Search(len(keys), func(i int) bool { return w.GetStart().GetTime().AsTime().Unix() <= keys[i] }) if firstIdx >= len(keys) || keys[firstIdx] > w.GetEnd().GetTime().AsTime().Unix() { continue } aWindowTime = append(aWindowTime, v1alpha1.TimeWindow{ Begin: metav1.NewTime(time.Unix(keys[firstIdx], 0)), End: metav1.NewTime(w.GetEnd().Time.AsTime()), }) aWindowSCN = append(aWindowSCN, v1alpha1.SCNWindow{ Begin: timeToBackups[keys[firstIdx]].Annotations[controllers.SCNAnnotation], End: w.GetEnd().GetScn(), }) } p.Status.AvailableRecoveryWindowTime = aWindowTime p.Status.AvailableRecoveryWindowSCN = aWindowSCN p.Status.CurrentDatabaseIncarnation = i.Status.CurrentDatabaseIncarnation if len(aWindowTime) > 0 && len(aWindowSCN) > 0 { p.Status.Conditions = k8s.Upsert(p.Status.Conditions, k8s.Ready, metav1.ConditionTrue, k8s.CreateComplete, "") } return r.PITRCtrl.UpdateStatus(ctx, p) } func (r *PITRReconciler) instanceToPITR(obj client.Object) []reconcile.Request { var requests []reconcile.Request var PITRList v1alpha1.PITRList inst := obj.(*v1alpha1.Instance) if err := r.List(context.Background(), &PITRList, client.InNamespace(inst.GetNamespace()), client.MatchingLabels{"instance": inst.GetName()}); err != nil { r.Log.Info("Failed to list pitr", "instance", inst.GetName()) return []reconcile.Request{} } for _, pitr := range PITRList.Items { requests = append(requests, reconcile.Request{ NamespacedName: types.NamespacedName{ Name: pitr.GetName(), Namespace: pitr.GetNamespace(), }}) } r.Log.Info("Instance event triggered reconcile ", "requests", requests) return requests } func validatePITRSpec(spec v1alpha1.PITRSpec) []string { errMsg := []string{} if !strings.HasPrefix(spec.StorageURI, util.GSPrefix) { errMsg = append(errMsg, "spec.storageURI only suppore GCS schemes.") } if spec.BackupSchedule != "" { if _, err := cron.ParseStandard(spec.BackupSchedule); err != nil { errMsg = append(errMsg, "spec.backupSchedule cannot be parsed.") } } return errMsg } func (r *PITRReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&v1alpha1.PITR{}). Watches( &source.Kind{Type: &v1alpha1.Instance{}}, handler.EnqueueRequestsFromMapFunc(r.instanceToPITR), ). Complete(r) }