pkg/controller/elasticsearch/driver/upgrade_pods_deletion.go (204 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.
package driver
import (
"context"
"sort"
corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
esv1 "github.com/elastic/cloud-on-k8s/v3/pkg/apis/elasticsearch/v1"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/expectations"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/label"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/reconcile"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/sset"
"github.com/elastic/cloud-on-k8s/v3/pkg/utils/k8s"
ulog "github.com/elastic/cloud-on-k8s/v3/pkg/utils/log"
"github.com/elastic/cloud-on-k8s/v3/pkg/utils/stringsutil"
)
// Delete runs through a list of potential candidates and select the ones that can be deleted.
// Do not run this function unless driver expectations are met.
func (ctx *upgradeCtx) Delete() ([]corev1.Pod, error) {
log := ulog.FromContext(ctx.parentCtx)
// Update the status with the list of Pods to be maybe upgraded here.
ctx.reconcileState.RecordNodesToBeUpgraded(k8s.PodNames(ctx.podsToUpgrade))
if len(ctx.podsToUpgrade) == 0 {
// We still want to ensure that predicates in the status are cleared.
ctx.reconcileState.RecordPredicatesResult(map[string]string{})
return nil, nil
}
// Get allowed deletions and check if maxUnavailable has been reached.
allowedDeletions, maxUnavailableReached := ctx.getAllowedDeletions()
// Step 1. Sort the Pods to get the ones with the higher priority
candidates := make([]corev1.Pod, len(ctx.podsToUpgrade)) // work on a copy in order to have no side effect
copy(candidates, ctx.podsToUpgrade)
sortCandidates(candidates)
// Step 2: Apply predicates
predicateContext := NewPredicateContext(
ctx.parentCtx,
ctx.ES,
ctx.resourcesList,
ctx.esState,
ctx.shardLister,
ctx.healthyPods,
ctx.podsToUpgrade,
ctx.expectedMasters,
ctx.currentPods,
)
log.V(1).Info("Applying predicates",
"maxUnavailableReached", maxUnavailableReached,
"allowedDeletions", allowedDeletions,
)
podsToDelete, err := applyPredicates(predicateContext, candidates, maxUnavailableReached, allowedDeletions, ctx.reconcileState)
if err != nil {
return podsToDelete, err
}
if len(podsToDelete) == 0 {
log.V(1).Info(
"No pod deleted during rolling upgrade",
"es_name", ctx.ES.Name,
"namespace", ctx.ES.Namespace,
)
return podsToDelete, nil
}
if err := ctx.prepareClusterForNodeRestart(podsToDelete); err != nil {
return nil, err
}
// TODO: If master is changed into a data node (or the opposite) it must be excluded or we should update m_m_n
var deletedPods []corev1.Pod //nolint:prealloc
for _, podToDelete := range podsToDelete {
if err := ctx.handleMasterScaleChange(podToDelete); err != nil {
return deletedPods, err
}
if readyToDelete, err := ctx.readyToDelete(podToDelete); err != nil || !readyToDelete {
return deletedPods, err
}
if err := deletePod(ctx.parentCtx, ctx.client, ctx.ES, podToDelete, ctx.expectations, ctx.reconcileState, "Deleting pod for rolling upgrade"); err != nil {
return deletedPods, err
}
deletedPods = append(deletedPods, podToDelete)
}
return deletedPods, nil
}
// DeleteAll unconditionally deletes all upgradeable Pods after calling the node shutdown API and accounting for quorum
// changes on older versions of Elasticsearch as applicable.
func (ctx *upgradeCtx) DeleteAll() ([]corev1.Pod, error) {
if len(ctx.podsToUpgrade) == 0 {
return nil, nil
}
if err := ctx.prepareClusterForNodeRestart(ctx.podsToUpgrade); err != nil {
return nil, err
}
var nonReadyPods []string
for _, podToDelete := range ctx.podsToUpgrade {
if err := ctx.handleMasterScaleChange(podToDelete); err != nil {
return nil, err
}
// do not delete any Pods if at least one is not ready for deletion
readyToDelete, err := ctx.readyToDelete(podToDelete)
if err != nil {
return nil, err
}
if !readyToDelete {
nonReadyPods = append(nonReadyPods, podToDelete.Name)
}
}
if len(nonReadyPods) > 0 {
ulog.FromContext(ctx.parentCtx).Info("Not all Pods are ready for a full cluster upgrade", "pods", nonReadyPods, "namespace", ctx.ES.Namespace, "es_name", ctx.ES.Name)
ctx.reconcileState.RecordNodesToBeUpgradedWithMessage(k8s.PodNames(ctx.podsToUpgrade), "Not all Pods are ready for a full cluster upgrade")
return nil, nil
}
var deletedPods []corev1.Pod //nolint:prealloc
for _, podToDelete := range ctx.podsToUpgrade {
if err := deletePod(ctx.parentCtx, ctx.client, ctx.ES, podToDelete, ctx.expectations, ctx.reconcileState, "Deleting Pod for full cluster upgrade"); err != nil {
// an error during deletion violates the "delete all or nothing" invariant but there is no way around it
return deletedPods, err
}
deletedPods = append(deletedPods, podToDelete)
}
return deletedPods, nil
}
// getAllowedDeletions returns the number of deletions that can be done and if maxUnavailable has been reached.
func (ctx *upgradeCtx) getAllowedDeletions() (int, bool) {
// Check if we are not over disruption budget
// Upscale is done, we should have the required number of Pods
actualPods := ctx.statefulSets.PodNames()
unhealthyPods := len(actualPods) - len(ctx.healthyPods)
maxUnavailable := ctx.ES.Spec.UpdateStrategy.ChangeBudget.GetMaxUnavailableOrDefault()
if maxUnavailable == nil {
// maxUnavailable is unbounded, we allow removing all pods
return len(actualPods), false
}
allowedDeletions := int(*maxUnavailable) - unhealthyPods
// If maxUnavailable is reached the deletion driver still allows one unhealthy Pod to be restarted.
maxUnavailableReached := allowedDeletions <= 0
return allowedDeletions, maxUnavailableReached
}
// sortCandidates is the default sort function, masters have lower priority as
// we want to update the data nodes first. After that pods are sorted by stateful set name
// then reverse ordinal order
// TODO: Add some priority to unhealthy (bootlooping) Pods
func sortCandidates(allPods []corev1.Pod) {
sort.Slice(allPods, func(i, j int) bool {
pod1 := allPods[i]
pod2 := allPods[j]
// check if either is a master node. masters come after all other roles
if label.IsMasterNode(pod1) && !label.IsMasterNode(pod2) {
return false
}
if !label.IsMasterNode(pod1) && label.IsMasterNode(pod2) {
return true
}
// neither or both are masters, use the reverse name function
ssetName1, ord1, err := sset.StatefulSetName(pod1.Name)
if err != nil {
return false
}
ssetName2, ord2, err := sset.StatefulSetName(pod2.Name)
if err != nil {
return false
}
if ssetName1 == ssetName2 {
// same name, compare ordinal, higher first
return ord1 > ord2
}
return ssetName1 < ssetName2
})
}
// handleMasterScaleChange handles Zen updates when a type change results in the addition or the removal of a master:
// In case of a master scale down it shares the same logic that a "traditional" scale down:
// * We proactively set m_m_n to the value of 1 if there are 2 Zen1 masters left
// * We exclude the master for Zen2
// In case of a master scale up there's nothing else to do:
// * If there are Zen1 nodes m_m_n is updated prior the update of the StatefulSet in HandleUpscaleAndSpecChanges
// * Because of the design of Zen2 there's nothing else to do for it.
func (ctx *upgradeCtx) handleMasterScaleChange(pod corev1.Pod) error {
masterScaleDown := label.IsMasterNode(pod) && !stringsutil.StringInSlice(pod.Name, ctx.expectedMasters)
if masterScaleDown {
if err := updateZenSettingsForDownscale(
ctx.parentCtx,
ctx.client,
ctx.esClient,
ctx.ES,
ctx.reconcileState,
ctx.statefulSets,
pod.Name,
); err != nil {
return err
}
}
return nil
}
func deletePod(
ctx context.Context,
k8sClient k8s.Client,
es esv1.Elasticsearch,
pod corev1.Pod,
expectations *expectations.Expectations,
reconcileState *reconcile.State,
msg string,
) error {
ulog.FromContext(ctx).Info(msg, "es_name", es.Name, "namespace", es.Namespace, "pod_name", pod.Name, "pod_uid", pod.UID)
// The name of the Pod we want to delete is not enough as it may have been already deleted/recreated.
// The uid of the Pod we want to delete is used as a precondition to check that we actually delete the right one.
// We also check the version of the Pod resource, to make sure its status is the current one and we're not deleting
// eg. a Pending Pod that is not Pending anymore.
opt := client.Preconditions{
UID: &pod.UID,
ResourceVersion: &pod.ResourceVersion,
}
err := k8sClient.Delete(ctx, &pod, opt)
if err != nil {
return err
}
// expect the pod to not be there in the cache at next reconciliation
expectations.ExpectDeletion(pod)
// Update status
reconcileState.RecordDeletedNode(pod.Name, msg)
return nil
}
// runPredicates runs all the predicates on a given Pod. Result is non nil if a predicate has failed.
// The second error is non nil if one of the predicate encountered an internal error.
func runPredicates(
ctx PredicateContext,
candidate corev1.Pod,
deletedPods []corev1.Pod,
maxUnavailableReached bool,
) (*failedPredicate, error) {
disabledPredicates := ctx.es.DisabledPredicates()
for _, predicate := range predicates {
canDelete, err := predicate.fn(ctx, candidate, deletedPods, maxUnavailableReached)
if err != nil {
return nil, err
}
if !canDelete {
// if this specific predicate name is disabled by the disable predicate annotation
// "eck.k8s.elastic.co/disable-upgrade-predicates", then ignore this predicate,
// and continue processing the remaining predicates.
if disabledPredicates.Has(predicate.name) || disabledPredicates.Has("*") {
ulog.FromContext(ctx.ctx).Info("Warning: disabling upgrade predicate because of annotation", "predicate", predicate.name, "namespace", ctx.es.Namespace, "es_name", ctx.es.Name)
continue
}
// Skip this Pod, it can't be deleted for the moment
return &failedPredicate{
pod: candidate.Name,
predicate: predicate.name,
}, nil
}
}
// All predicates passed!
return nil, nil
}