common/pkg/utils/utils.go (241 lines of code) (raw):

// Copyright 2021 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // Package utils features auxiliary functions for the Anthos DB Operator compliant resources. package utils import ( "context" "errors" "fmt" "net" "regexp" snapv1 "github.com/kubernetes-csi/external-snapshotter/client/v4/apis/volumesnapshot/v1" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" commonv1alpha1 "github.com/GoogleCloudPlatform/elcarro-oracle-operator/common/api/v1alpha1" ) const ( PlatformGCP = "GCP" PlatformBareMetal = "BareMetal" PlatformMinikube = "Minikube" PlatformKind = "Kind" EnginePostgres = "Postgres" EngineOracle = "Oracle" defaultStorageClassNameGCP = "standard-rwo" defaultVolumeSnapshotClassNameGCP = "csi-gce-pd-snapshot-class" defaultStorageClassNameBM = "csi-trident" defaultVolumeSnapshotClassNameBM = "csi-trident-snapshot-class" defaultStorageClassNameMinikube = "csi-hostpath-sc" defaultVolumeSnapshotClassNameMinikube = "csi-hostpath-snapclass" ) var ( ErrPodUnschedulable = errors.New("Pod is unschedulable") ErrNoResources = errors.New("Insufficient resources to create Pod") NoResourcesMessageRegexp = "Insufficient memory|Insufficient cpu|enough free storage" ) type platformConfig struct { storageClassName string volumeSnapshotClassName string } func getPlatformConfig(p string, e string) (*platformConfig, error) { // for Postgres, it allows to have no platform specified if p == "" && e == EnginePostgres { return &platformConfig{}, nil } switch p { case PlatformGCP: return &platformConfig{ storageClassName: defaultStorageClassNameGCP, volumeSnapshotClassName: defaultVolumeSnapshotClassNameGCP, }, nil case PlatformBareMetal: return &platformConfig{ storageClassName: defaultStorageClassNameBM, volumeSnapshotClassName: defaultVolumeSnapshotClassNameBM, }, nil case PlatformMinikube, PlatformKind: return &platformConfig{ storageClassName: defaultStorageClassNameMinikube, volumeSnapshotClassName: defaultVolumeSnapshotClassNameMinikube, }, nil default: return nil, fmt.Errorf("the current release doesn't support deployment platform %q", p) } } func FindDiskSize(diskSpec *commonv1alpha1.DiskSpec, configSpec *commonv1alpha1.ConfigSpec, defaultDiskSpecs map[string]commonv1alpha1.DiskSpec, defaultDiskSize resource.Quantity) resource.Quantity { spec, exists := defaultDiskSpecs[diskSpec.Name] if !diskSpec.Size.IsZero() { return diskSpec.Size } if configSpec != nil { for _, d := range configSpec.Disks { if d.Name == diskSpec.Name { if !d.Size.IsZero() { return d.Size } break } } } if exists { return spec.Size } return defaultDiskSize } func FindStorageClassName(diskSpec *commonv1alpha1.DiskSpec, configSpec *commonv1alpha1.ConfigSpec, defaultPlatform string, engineType string) (string, error) { if diskSpec.StorageClass != "" { return diskSpec.StorageClass, nil } if configSpec != nil { for _, d := range configSpec.Disks { if d.Name == diskSpec.Name { if d.StorageClass != "" { return d.StorageClass, nil } break } } if configSpec.StorageClass != "" { return configSpec.StorageClass, nil } } platform := setPlatform(defaultPlatform, configSpec) pc, err := getPlatformConfig(platform, engineType) if err != nil { return "", err } return pc.storageClassName, nil } func setPlatform(defaultPlatform string, configSpec *commonv1alpha1.ConfigSpec) string { platform := defaultPlatform if configSpec != nil && configSpec.Platform != "" { platform = configSpec.Platform } return platform } func FindVolumeSnapshotClassName(volumneSnapshotClass string, configSpec *commonv1alpha1.ConfigSpec, defaultPlatform string, engineType string) (string, error) { if volumneSnapshotClass != "" { return volumneSnapshotClass, nil } if configSpec != nil && configSpec.VolumeSnapshotClass != "" { return configSpec.VolumeSnapshotClass, nil } platform := setPlatform(defaultPlatform, configSpec) pc, err := getPlatformConfig(platform, engineType) if err != nil { return "", err } return pc.volumeSnapshotClassName, nil } // DiskSpaceTotal is a helper function to calculate the total amount // of allocated space across all disks requested for an instance. func DiskSpaceTotal(inst commonv1alpha1.Instance) (int64, error) { spec := inst.InstanceSpec() if spec.Disks == nil { return -1, fmt.Errorf("failed to detect requested disks for inst: %v", spec) } var total int64 for _, d := range spec.Disks { i, ok := d.Size.AsInt64() if !ok { return -1, fmt.Errorf("Invalid size provided for disk: %v. An integer must be provided.\n", d) } total += i } return total, nil } // SnapshotDisks takes a snapshot of each disk as provided in diskSpecs. Ownership of the snapshots are granted to owner object. // getPvcSnapshotName is a function that, given a DiskSpec, returns the full PVC name, snapshot name, and volumeSnapshotClassName of that disk. // Taking snapshots here is best-effort only: it will returns errors even if only 1 disk failed the snapshot, and upon retry it will try to take snapshot of all disks again. func SnapshotDisks(ctx context.Context, diskSpecs []commonv1alpha1.DiskSpec, owner metav1.Object, c client.Client, scheme *runtime.Scheme, getPvcSnapshotName func(commonv1alpha1.DiskSpec) (string, string, string), applyOpts []client.PatchOption) error { for _, diskSpec := range diskSpecs { fullPVCName, snapshotName, vsc := getPvcSnapshotName(diskSpec) snap, err := newSnapshot(owner, scheme, fullPVCName, snapshotName, vsc) if err != nil { return err } if err := c.Patch(ctx, snap, client.Apply, applyOpts...); err != nil { return err } } return nil } // newSnapshot returns the snapshot for the given pv and set owner to own that snapshot. func newSnapshot(owner v1.Object, scheme *runtime.Scheme, pvcName, snapName, volumeSnapshotClassName string) (*snapv1.VolumeSnapshot, error) { snapshot := &snapv1.VolumeSnapshot{ TypeMeta: metav1.TypeMeta{APIVersion: snapv1.SchemeGroupVersion.String(), Kind: "VolumeSnapshot"}, ObjectMeta: metav1.ObjectMeta{Name: snapName, Namespace: owner.GetNamespace(), Labels: map[string]string{"name": snapName}}, Spec: snapv1.VolumeSnapshotSpec{ Source: snapv1.VolumeSnapshotSource{PersistentVolumeClaimName: &pvcName}, VolumeSnapshotClassName: func() *string { s := string(volumeSnapshotClassName); return &s }(), }, } // Set the owner resource to own the VolumeSnapshot resource. if err := ctrl.SetControllerReference(owner, snapshot, scheme); err != nil { return snapshot, err } return snapshot, nil } // LoadBalancerAnnotations returns cloud provider specific annotations that must be attached to a LoadBalancer k8s service during creation func LoadBalancerAnnotations(options *commonv1alpha1.DBLoadBalancerOptions) map[string]string { var annotations map[string]string if options != nil { if options.GCP.LoadBalancerType == "Internal" { annotations = map[string]string{ "cloud.google.com/load-balancer-type": "Internal", } } } return annotations } // LoadBalancerIpAddress returns an IP address address for the Load Balancer if specified. Otherwise, the empty string is returned. func LoadBalancerIpAddress(options *commonv1alpha1.DBLoadBalancerOptions) string { if options != nil { return options.GCP.LoadBalancerIP } return "" } // LoadBalancerURL returns a URL that can be used to connect to a Load Balancer. func LoadBalancerURL(svc *corev1.Service, port int) string { if svc == nil || len(svc.Status.LoadBalancer.Ingress) == 0 { return "" } hostName := svc.Status.LoadBalancer.Ingress[0].Hostname if hostName == "" { hostName = svc.Status.LoadBalancer.Ingress[0].IP } return net.JoinHostPort(hostName, fmt.Sprintf("%d", port)) } func ObjectKeyOf(sts *appsv1.StatefulSet, pvc *corev1.PersistentVolumeClaim, i int) client.ObjectKey { // name template from https://github.com/kubernetes/kubernetes/blob/v1.23.5/pkg/controller/ssettatefulset/stateful_set_utils.go#L96 return client.ObjectKey{ Namespace: sts.Namespace, Name: fmt.Sprintf("%v-%v-%v", pvc.GetName(), sts.Name, i), } } // Check if POD initialiazation is stuck because of insufficient resources // It will return error if Pod creation is stuck and needs manual intervention. func VerifyPodsStatus(ctx context.Context, cli client.Client, sts *appsv1.StatefulSet) error { pods, err := FindPods(ctx, cli, sts) if err != nil { return err } for _, pod := range pods { if cond := GetPodCondition(pod, corev1.PodScheduled); cond != nil && cond.Reason == corev1.PodReasonUnschedulable { r := regexp.MustCompile(NoResourcesMessageRegexp) if match := r.MatchString(cond.Message); match { return fmt.Errorf("%s: %w", cond.Message, ErrNoResources) } return fmt.Errorf("%s: %w", cond.Message, ErrPodUnschedulable) } } return nil } func FindPods(ctx context.Context, cli client.Client, sts *appsv1.StatefulSet) ([]corev1.Pod, error) { if sts == nil || sts.Spec.Selector == nil { return nil, nil } var pods corev1.PodList labels := sts.Spec.Selector.MatchLabels if err := cli.List(ctx, &pods, client.MatchingLabels(labels), client.InNamespace(sts.Namespace)); err != nil { return nil, err } return pods.Items, nil } func GetPodCondition(pod corev1.Pod, condType corev1.PodConditionType) *corev1.PodCondition { for _, cond := range pod.Status.Conditions { if cond.Type == condType { return &cond } } return nil }