pkg/controller/elasticsearch/pdb/reconcile.go (156 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.
package pdb
import (
"context"
policyv1 "k8s.io/api/policy/v1"
policyv1beta1 "k8s.io/api/policy/v1beta1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/kubernetes/scheme"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
commonv1 "github.com/elastic/cloud-on-k8s/v3/pkg/apis/common/v1"
esv1 "github.com/elastic/cloud-on-k8s/v3/pkg/apis/elasticsearch/v1"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/hash"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/reconciler"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/label"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/sset"
"github.com/elastic/cloud-on-k8s/v3/pkg/utils/k8s"
"github.com/elastic/cloud-on-k8s/v3/pkg/utils/maps"
)
// Reconcile ensures that a PodDisruptionBudget exists for this cluster, inheriting the spec content.
// The default PDB we setup dynamically adapts MinAvailable to the number of nodes in the cluster.
// If the spec has disabled the default PDB, it will ensure none exist.
func Reconcile(ctx context.Context, k8sClient k8s.Client, es esv1.Elasticsearch, statefulSets sset.StatefulSetList) error {
expected, err := expectedPDB(es, statefulSets)
if err != nil {
return err
}
if expected == nil {
return deleteDefaultPDB(ctx, k8sClient, es)
}
// label the PDB with a hash of its content, for comparison purposes
expected.Labels = hash.SetTemplateHashLabel(expected.Labels, expected)
v1Available, err := isPDBV1Available(k8sClient)
if err != nil {
return err
}
if v1Available {
reconciled := &policyv1.PodDisruptionBudget{}
return reconciler.ReconcileResource(
reconciler.Params{
Context: ctx,
Client: k8sClient,
Owner: &es,
Expected: expected,
Reconciled: reconciled,
NeedsUpdate: func() bool {
return hash.GetTemplateHashLabel(expected.Labels) != hash.GetTemplateHashLabel(reconciled.Labels)
},
UpdateReconciled: func() {
expected.DeepCopyInto(reconciled)
},
},
)
}
// Fall back to v1beta1
reconciled := &policyv1beta1.PodDisruptionBudget{}
converted := convert(expected)
return reconciler.ReconcileResource(
reconciler.Params{
Context: ctx,
Client: k8sClient,
Owner: &es,
Expected: converted,
Reconciled: reconciled,
NeedsUpdate: func() bool {
return hash.GetTemplateHashLabel(converted.Labels) != hash.GetTemplateHashLabel(reconciled.Labels)
},
UpdateReconciled: func() {
converted.DeepCopyInto(reconciled)
},
},
)
}
// deleteDefaultPDB deletes the default pdb if it exists.
func deleteDefaultPDB(ctx context.Context, k8sClient k8s.Client, es esv1.Elasticsearch) error {
// we do this by getting first because that is a local cache read,
// versus a Delete call, which would hit the API.
v1Available, err := isPDBV1Available(k8sClient)
if err != nil {
return err
}
var pdb client.Object
if v1Available {
pdb = &policyv1.PodDisruptionBudget{
ObjectMeta: metav1.ObjectMeta{
Namespace: es.Namespace,
Name: esv1.DefaultPodDisruptionBudget(es.Name),
},
}
} else {
pdb = &policyv1beta1.PodDisruptionBudget{
ObjectMeta: metav1.ObjectMeta{
Namespace: es.Namespace,
Name: esv1.DefaultPodDisruptionBudget(es.Name),
},
}
}
if err := k8sClient.Get(ctx, k8s.ExtractNamespacedName(pdb), pdb); err != nil && !apierrors.IsNotFound(err) {
return err
} else if apierrors.IsNotFound(err) {
// already deleted, which is fine
return nil
}
if err := k8sClient.Delete(ctx, pdb); err != nil && !apierrors.IsNotFound(err) {
return err
}
return nil
}
// expectedPDB returns a PDB according to the given ES spec.
// It may return nil if the PDB has been explicitly disabled in the ES spec.
func expectedPDB(es esv1.Elasticsearch, statefulSets sset.StatefulSetList) (*policyv1.PodDisruptionBudget, error) {
template := es.Spec.PodDisruptionBudget.DeepCopy()
if template.IsDisabled() {
return nil, nil
}
if template == nil {
template = &commonv1.PodDisruptionBudgetTemplate{}
}
expected := policyv1.PodDisruptionBudget{
ObjectMeta: template.ObjectMeta,
}
// inherit user-provided ObjectMeta, but set our own name & namespace
expected.Name = esv1.DefaultPodDisruptionBudget(es.Name)
expected.Namespace = es.Namespace
// and append our labels
expected.Labels = maps.MergePreservingExistingKeys(expected.Labels, label.NewLabels(k8s.ExtractNamespacedName(&es)))
// set owner reference for deletion upon ES resource deletion
if err := controllerutil.SetControllerReference(&es, &expected, scheme.Scheme); err != nil {
return nil, err
}
if template.Spec.Selector != nil || template.Spec.MaxUnavailable != nil || template.Spec.MinAvailable != nil {
// use the user-defined spec
expected.Spec = template.Spec
} else {
// set our default spec
expected.Spec = buildPDBSpec(es, statefulSets)
}
return &expected, nil
}
// buildPDBSpec returns a PDBSpec computed from the current StatefulSets,
// considering the cluster health and topology.
func buildPDBSpec(es esv1.Elasticsearch, statefulSets sset.StatefulSetList) policyv1.PodDisruptionBudgetSpec {
// compute MinAvailable based on the maximum number of Pods we're supposed to have
nodeCount := statefulSets.ExpectedNodeCount()
// maybe allow some Pods to be disrupted
minAvailable := nodeCount - allowedDisruptions(es, statefulSets)
minAvailableIntStr := intstr.IntOrString{Type: intstr.Int, IntVal: minAvailable}
return policyv1.PodDisruptionBudgetSpec{
// match all pods for this cluster
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
label.ClusterNameLabelName: es.Name,
},
},
MinAvailable: &minAvailableIntStr,
// MaxUnavailable can only be used if the selector matches a builtin controller selector
// (eg. Deployments, StatefulSets, etc.). We cannot use it with our own cluster-name selector.
MaxUnavailable: nil,
}
}
// allowedDisruptions returns the number of Pods that we allow to be disrupted while keeping the cluster healthy.
func allowedDisruptions(es esv1.Elasticsearch, actualSsets sset.StatefulSetList) int32 {
if actualSsets.ExpectedNodeCount() == 1 {
// single node cluster (not highly-available)
// allow the node to be disrupted to ensure K8s nodes operations can be performed
return 1
}
if es.Status.Health != esv1.ElasticsearchGreenHealth {
// A non-green cluster may become red if we disrupt one node, don't allow it.
// The health information we're using here may be out-of-date, that's best effort.
return 0
}
if actualSsets.ExpectedMasterNodesCount() == 1 {
// There's a risk the single master of the cluster gets removed, don't allow it.
return 0
}
if actualSsets.ExpectedDataNodesCount() == 1 {
// There's a risk the single data node of the cluster gets removed, don't allow it.
return 0
}
if actualSsets.ExpectedIngestNodesCount() == 1 {
// There's a risk the single ingest node of the cluster gets removed, don't allow it.
return 0
}
// Allow one pod (only) to be disrupted on a healthy cluster.
// We could technically allow more, but the cluster health freshness would become a bigger problem.
return 1
}