pkg/controller/common/volume/pvc_expansion.go (304 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.
package volume
import (
"context"
"encoding/json"
"fmt"
"strings"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes/scheme"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
sset "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/statefulset"
volumevalidations "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/volume/validations"
"github.com/elastic/cloud-on-k8s/v3/pkg/utils/k8s"
ulog "github.com/elastic/cloud-on-k8s/v3/pkg/utils/log"
)
// handleVolumeExpansion works around the immutability of VolumeClaimTemplates in StatefulSets by:
// 1. updating storage requests in PVCs whose storage class supports volume expansion
// 2. scheduling the StatefulSet for recreation with the new storage spec
// It returns a boolean indicating whether the StatefulSet needs to be recreated.
// Note that some storage drivers also require Pods to be deleted/recreated for the filesystem to be resized
// (as opposed to a hot resize while the Pod is running). This is left to the responsibility of the user.
// This should be handled differently once supported by the StatefulSet controller: https://github.com/kubernetes/kubernetes/issues/68737.
func HandleVolumeExpansion(
ctx context.Context,
k8sClient k8s.Client,
owner client.Object,
ownerKind string,
expectedSset appsv1.StatefulSet,
actualSset appsv1.StatefulSet,
validateStorageClass bool,
) (bool, error) {
// ensure there are no incompatible storage size modification
if err := volumevalidations.ValidateClaimsStorageUpdate(
ctx,
k8sClient,
actualSset.Spec.VolumeClaimTemplates,
expectedSset.Spec.VolumeClaimTemplates,
validateStorageClass); err != nil {
return false, err
}
// resize all PVCs that can be resized
err := resizePVCs(ctx, k8sClient, owner, expectedSset, actualSset)
if err != nil {
return false, err
}
// schedule the StatefulSet for recreation if needed
if needsRecreate(expectedSset, actualSset) {
return true, annotateForRecreation(ctx, k8sClient, owner, ownerKind, actualSset, expectedSset.Spec.VolumeClaimTemplates)
}
return false, nil
}
// ResizePVCs updates the spec of all existing PVCs whose storage requests can be expanded,
// according to their storage class and what's specified in the expected claim.
// It returns an error if the requested storage size is incompatible with the PVC.
func resizePVCs(
ctx context.Context,
k8sClient k8s.Client,
owner client.Object,
expectedSset appsv1.StatefulSet,
actualSset appsv1.StatefulSet,
) error {
// match each existing PVC with an expected claim, and decide whether the PVC should be resized
actualPVCs, err := sset.RetrieveActualPVCs(k8sClient, actualSset)
if err != nil {
return err
}
for claimName, pvcs := range actualPVCs {
expectedClaim := sset.GetClaim(expectedSset.Spec.VolumeClaimTemplates, claimName)
if expectedClaim == nil {
continue
}
for _, pvc := range pvcs {
pvc := pvc
storageCmp := k8s.CompareStorageRequests(pvc.Spec.Resources, expectedClaim.Spec.Resources)
if !storageCmp.Increase {
// not an increase, nothing to do
continue
}
accessor := meta.NewAccessor()
name, _ := accessor.Name(owner)
newSize := expectedClaim.Spec.Resources.Requests.Storage()
ulog.FromContext(ctx).Info("Resizing PVC storage requests. Depending on the volume provisioner, "+
"Pods may need to be manually deleted for the filesystem to be resized.",
"namespace", pvc.Namespace, "name", name, "pvc_name", pvc.Name,
"old_value", pvc.Spec.Resources.Requests.Storage().String(), "new_value", newSize.String())
pvc.Spec.Resources.Requests[corev1.ResourceStorage] = *newSize
if err := k8sClient.Update(ctx, &pvc); err != nil {
return err
}
}
}
return nil
}
// AnnotateForRecreation stores the StatefulSet spec with updated storage requirements
// in an annotation of the owning resource, to be recreated at the next reconciliation.
func annotateForRecreation(
ctx context.Context,
k8sClient k8s.Client,
owner client.Object,
ownerKind string,
actualSset appsv1.StatefulSet,
expectedClaims []corev1.PersistentVolumeClaim,
) error {
namespacedName := namespacedNameFromObject(owner)
ulog.FromContext(ctx).Info("Preparing StatefulSet re-creation to account for PVC resize",
"namespace", namespacedName.Namespace, "name", namespacedName.Name, "statefulset_name", actualSset.Name)
actualSset.Spec.VolumeClaimTemplates = expectedClaims
asJSON, err := json.Marshal(actualSset)
if err != nil {
return err
}
err = setAnnotation(owner, getRecreateStatefulSetAnnotationKey(ownerKind, actualSset.Name), string(asJSON))
if err != nil {
return err
}
return k8sClient.Update(ctx, owner)
}
// needsRecreate returns true if the StatefulSet needs to be re-created to account for volume expansion.
func needsRecreate(expectedSset appsv1.StatefulSet, actualSset appsv1.StatefulSet) bool {
for _, expectedClaim := range expectedSset.Spec.VolumeClaimTemplates {
actualClaim := sset.GetClaim(actualSset.Spec.VolumeClaimTemplates, expectedClaim.Name)
if actualClaim == nil {
continue
}
storageCmp := k8s.CompareStorageRequests(actualClaim.Spec.Resources, expectedClaim.Spec.Resources)
if storageCmp.Increase {
return true
}
}
return false
}
// RecreateStatefulSets re-creates StatefulSets as specified in annotations, to account for
// resized volume claims.
// This function acts as a state machine that depends on the annotation and the UID of existing StatefulSets.
// A standard flow may span over multiple reconciliations like this:
// 1. No annotation set: nothing to do.
// 2. An annotation specifies StatefulSet Foo needs to be recreated. That StatefulSet actually exists: delete it.
// 3. An annotation specifies StatefulSet Foo needs to be recreated. That StatefulSet does not exist: create it.
// 4. An annotation specifies StatefulSet Foo needs to be recreated. That StatefulSet actually exists, but with
// a different UID: the re-creation is over, remove the annotation.
func RecreateStatefulSets(ctx context.Context, k8sClient k8s.Client, owner client.Object, ownerKind string) (int, error) {
log := ulog.FromContext(ctx)
recreateList, err := ssetsToRecreate(owner, ownerKind)
if err != nil {
return 0, err
}
recreations := len(recreateList)
for annotation, toRecreate := range recreateList {
toRecreate := toRecreate
namespacedName := namespacedNameFromObject(owner)
var existing appsv1.StatefulSet
err = k8sClient.Get(ctx, k8s.ExtractNamespacedName(&toRecreate), &existing)
switch {
// error case
case err != nil && !apierrors.IsNotFound(err):
return recreations, err
// already exists with the same UID: deletion case
case existing.UID == toRecreate.UID && !apierrors.IsNotFound(err):
log.Info("Deleting StatefulSet to account for resized PVCs, it will be recreated automatically",
"namespace", namespacedName.Namespace, "name", namespacedName.Name, "statefulset_name", existing.Name)
// mark the Pod as owned by the component resource while the StatefulSet is removed
if err := updatePodOwners(ctx, k8sClient, owner, ownerKind, existing); err != nil {
return recreations, err
}
if err := deleteStatefulSet(ctx, k8sClient, existing); err != nil {
if apierrors.IsNotFound(err) {
return recreations, nil
}
return recreations, err
}
// already deleted: creation case
case err != nil && apierrors.IsNotFound(err):
log.Info("Re-creating StatefulSet to account for resized PVCs",
"namespace", namespacedName.Namespace, "name", namespacedName.Name, "statefulset_name", toRecreate.Name)
if err := createStatefulSet(ctx, k8sClient, toRecreate); err != nil {
return recreations, err
}
// already recreated (existing.UID != toRecreate.UID): we're done
default:
// remove the temporary pod owner set before the StatefulSet was deleted
if err := removePodOwner(ctx, k8sClient, owner, ownerKind, existing); err != nil {
return recreations, err
}
// remove the annotation
err := deleteAnnotation(owner, annotation)
if err != nil {
return recreations, err
}
if err := k8sClient.Update(ctx, owner); err != nil {
return recreations, err
}
// one less recreation
recreations--
}
}
return recreations, nil
}
func deleteStatefulSet(ctx context.Context, k8sClient k8s.Client, sset appsv1.StatefulSet) error {
opts := client.DeleteOptions{}
// ensure we are not deleting the StatefulSet that was already recreated with a different UID
opts.Preconditions = &metav1.Preconditions{UID: &sset.UID}
// ensure Pods are not also deleted
orphanPolicy := metav1.DeletePropagationOrphan
opts.PropagationPolicy = &orphanPolicy
ulog.FromContext(ctx).V(1).Info("Deleting stateful set", "statefulset_name", sset.Name, "namespace", sset.Namespace)
return k8sClient.Delete(ctx, &sset, &opts)
}
func createStatefulSet(ctx context.Context, k8sClient k8s.Client, sset appsv1.StatefulSet) error {
// don't keep metadata inherited from the old StatefulSet
newObjMeta := metav1.ObjectMeta{
Name: sset.Name,
Namespace: sset.Namespace,
Labels: sset.Labels,
Annotations: sset.Annotations,
OwnerReferences: sset.OwnerReferences,
Finalizers: sset.Finalizers,
}
sset.ObjectMeta = newObjMeta
return k8sClient.Create(ctx, &sset)
}
// updatePodOwners marks all Pods managed by the given StatefulSet as owned by the parent resource.
// Pods are already owned by the StatefulSet resource, but when we'll (temporarily) delete that StatefulSet
// they won't be owned anymore. At this point if the resource is deleted (before the StatefulSet
// is re-created), we also want the Pods to be deleted automatically.
func updatePodOwners(ctx context.Context, k8sClient k8s.Client, owner client.Object, ownerKind string, statefulSet appsv1.StatefulSet) error {
namespacedName := namespacedNameFromObject(owner)
ulog.FromContext(ctx).V(1).Info("Setting an owner ref to the component resource on the future orphan Pods",
"namespace", namespacedName.Namespace, "name", namespacedName.Name, "statefulset_name", statefulSet.Name)
return updatePods(ctx, k8sClient, getStatefulSetLabelName(ownerKind), statefulSet, func(p *corev1.Pod) error {
return controllerutil.SetOwnerReference(owner, p, scheme.Scheme)
})
}
// removePodOwner removes any reference to the resource from the Pods, that was set in updatePodOwners.
func removePodOwner(ctx context.Context, k8sClient k8s.Client, owner client.Object, ownerKind string, statefulSet appsv1.StatefulSet) error {
accessor := meta.NewAccessor()
name, _ := accessor.Name(owner)
namespace, _ := accessor.Namespace(owner)
UID, err := accessor.UID(owner)
if err != nil {
return err
}
ulog.FromContext(ctx).V(1).Info("Removing any Pod owner ref set to the component resource after StatefulSet re-creation",
"namespace", namespace, "name", name, "statefulset_name", statefulSet.Name)
updateFunc := func(p *corev1.Pod) error {
for i, ownerRef := range p.OwnerReferences {
if ownerRef.UID == UID && ownerRef.Name == name && ownerRef.Kind == ownerKind {
// remove from the owner ref slice
p.OwnerReferences = append(p.OwnerReferences[:i], p.OwnerReferences[i+1:]...)
return nil
}
}
return nil
}
return updatePods(ctx, k8sClient, getStatefulSetLabelName(ownerKind), statefulSet, updateFunc)
}
// updatePods applies updateFunc on all existing Pods from the StatefulSet, then update those Pods.
func updatePods(ctx context.Context, k8sClient k8s.Client, label string, statefulSet appsv1.StatefulSet, updateFunc func(p *corev1.Pod) error) error {
pods, err := sset.GetActualPodsForStatefulSet(k8sClient, k8s.ExtractNamespacedName(&statefulSet), label)
if err != nil {
return err
}
for i := range pods {
if err := updateFunc(&pods[i]); err != nil {
return err
}
if err := k8sClient.Update(ctx, &pods[i]); err != nil {
return err
}
}
return nil
}
func namespacedNameFromObject(owner client.Object) types.NamespacedName {
accessor := meta.NewAccessor()
name, err := accessor.Name(owner)
if err != nil {
name = "-"
}
namespace, _ := accessor.Namespace(owner)
if err != nil {
namespace = "-"
}
return types.NamespacedName{Name: name, Namespace: namespace}
}
func setAnnotation(owner client.Object, annotationKey string, annotationValue string) error {
accessor := meta.NewAccessor()
annotations, err := accessor.Annotations(owner)
if err != nil {
return err
}
if annotations == nil {
annotations = make(map[string]string, 1)
}
annotations[annotationKey] = annotationValue
err = accessor.SetAnnotations(owner, annotations)
if err != nil {
return err
}
return nil
}
func deleteAnnotation(owner client.Object, annotation string) error {
accessor := meta.NewAccessor()
annotations, err := accessor.Annotations(owner)
if err != nil {
return err
}
if annotations == nil {
annotations = make(map[string]string, 1)
}
delete(annotations, annotation)
err = accessor.SetAnnotations(owner, annotations)
if err != nil {
return err
}
return nil
}
// ssetsToRecreate returns the list of StatefulSet that should be recreated, based on annotations
// in the parent component resource.
func ssetsToRecreate(owner client.Object, ownerKind string) (map[string]appsv1.StatefulSet, error) {
accessor := meta.NewAccessor()
annotations, err := accessor.Annotations(owner)
if err != nil {
return nil, err
}
annotationPrefix := getRecreateStatefulSetAnnotationPrefix(ownerKind)
toRecreate := map[string]appsv1.StatefulSet{}
for key, value := range annotations {
if !strings.HasPrefix(key, annotationPrefix) {
continue
}
var sset appsv1.StatefulSet
if err := json.Unmarshal([]byte(value), &sset); err != nil {
return nil, err
}
toRecreate[key] = sset
}
return toRecreate, nil
}
func getStatefulSetLabelName(ownerKind string) string {
return fmt.Sprintf("%s.k8s.elastic.co/statefulset-name", strings.ToLower(ownerKind))
}
func getRecreateStatefulSetAnnotationPrefix(ownerKind string) string {
return fmt.Sprintf("%s.k8s.elastic.co/recreate-", strings.ToLower(ownerKind))
}
func getRecreateStatefulSetAnnotationKey(ownerKind string, ssetName string) string {
return fmt.Sprintf("%s%s", getRecreateStatefulSetAnnotationPrefix(ownerKind), ssetName)
}