pkg/controller/elasticsearch/driver/downscale.go (263 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.
package driver
import (
"context"
"errors"
"fmt"
"sort"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
esv1 "github.com/elastic/cloud-on-k8s/v3/pkg/apis/elasticsearch/v1"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/events"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/reconciler"
sset "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/statefulset"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/certificates/transport"
esclient "github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/client"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/label"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/nodespec"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/reconcile"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/settings"
es_sset "github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/sset"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/version/zen1"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/version/zen2"
"github.com/elastic/cloud-on-k8s/v3/pkg/utils/k8s"
"github.com/elastic/cloud-on-k8s/v3/pkg/utils/stringsutil"
)
// HandleDownscale attempts to downscale actual StatefulSets towards expected ones.
func HandleDownscale(
downscaleCtx downscaleContext,
expectedStatefulSets es_sset.StatefulSetList,
actualStatefulSets es_sset.StatefulSetList,
) *reconciler.Results {
results := &reconciler.Results{}
// Retrieve the current list of Pods for this cluster. This list is used to compute the nodes that should be eventually removed,
// and the ones that will be removed in this reconciliation attempt.
actualPods, err := es_sset.GetActualPodsForCluster(downscaleCtx.k8sClient, downscaleCtx.es)
if err != nil {
return results.WithError(err)
}
// Compute the desired downscale, without applying any budget filter, to feed the status and let the user know what nodes should
// be eventually removed, not only in this reconciliation attempt, but also in the next ones.
desiredDownscale, _ := podsToDownscale(downscaleCtx.parentCtx, actualPods, downscaleCtx.es, expectedStatefulSets, actualStatefulSets, noDownscaleFilter)
desiredLeavingNodes := leavingNodeNames(desiredDownscale)
downscaleCtx.reconcileState.RecordNodesToBeRemoved(desiredLeavingNodes)
// Compute the desired downscale, applying a budget filter to make sure we only downscale nodes we're allowed to.
downscaleState := newDownscaleState(actualPods, downscaleCtx.es)
// compute the list of StatefulSet downscales and deletions to perform
downscales, deletions := calculateDownscales(downscaleCtx.parentCtx, *downscaleState, expectedStatefulSets, actualStatefulSets, downscaleBudgetFilter)
// remove actual StatefulSets that should not exist anymore (already downscaled to 0 in the past)
// this is safe thanks to expectations: we're sure 0 actual replicas means 0 corresponding pods exist
if err := deleteStatefulSets(downscaleCtx.parentCtx, deletions, downscaleCtx.k8sClient, downscaleCtx.es); err != nil {
return results.WithError(err)
}
// initiate shutdown of nodes that should be removed
// if leaving nodes is empty this should cancel any ongoing shutdowns
leavingNodes := leavingNodeNames(downscales)
terminatingNodes := k8s.PodNames(k8s.TerminatingPods(actualPods))
if err := downscaleCtx.nodeShutdown.ReconcileShutdowns(downscaleCtx.parentCtx, leavingNodes, terminatingNodes); err != nil {
return results.WithError(err)
}
for _, downscale := range downscales {
// attempt the StatefulSet downscale (may or may not remove nodes)
requeue, err := attemptDownscale(downscaleCtx, downscale, actualStatefulSets)
if err != nil {
return results.WithError(err)
}
if requeue {
// retry downscaling this statefulset later
results.WithReconciliationState(defaultRequeue.WithReason("Downscale in progress"))
}
}
// Ensure that the status mention the delayed nodes
if delayedLeavingNodes, _ := stringsutil.Difference(desiredLeavingNodes, leavingNodes); len(delayedLeavingNodes) > 0 {
sort.Strings(delayedLeavingNodes)
results.WithReconciliationState(defaultRequeue.WithReason(fmt.Sprintf("Downscale in progress, delayed nodes: %s", delayedLeavingNodes)))
}
return results
}
func podsToDownscale(
ctx context.Context,
actualPods []corev1.Pod,
es esv1.Elasticsearch,
expectedStatefulSets es_sset.StatefulSetList,
actualStatefulSets es_sset.StatefulSetList,
downscaleFilter downscaleFilter,
) ([]ssetDownscale, es_sset.StatefulSetList) {
downscaleState := newDownscaleState(actualPods, es)
// compute the list of StatefulSet downscales and deletions to perform
return calculateDownscales(ctx, *downscaleState, expectedStatefulSets, actualStatefulSets, downscaleFilter)
}
// deleteStatefulSets deletes the given StatefulSets along with their associated resources.
func deleteStatefulSets(ctx context.Context, toDelete es_sset.StatefulSetList, k8sClient k8s.Client, es esv1.Elasticsearch) error {
for _, toDelete := range toDelete {
if err := deleteStatefulSetResources(ctx, k8sClient, es, toDelete); err != nil {
return err
}
}
return nil
}
// calculateDownscales compares expected and actual StatefulSets to return a list of StatefulSets
// that can be downscaled (replica decrease) or deleted (no replicas).
func calculateDownscales(ctx context.Context, state downscaleState, expectedStatefulSets es_sset.StatefulSetList, actualStatefulSets es_sset.StatefulSetList, downscaleFilter downscaleFilter) (downscales []ssetDownscale, deletions es_sset.StatefulSetList) {
expectedStatefulSetsNames := expectedStatefulSets.Names()
for _, actualSset := range actualStatefulSets {
actualReplicas := sset.GetReplicas(actualSset)
expectedSset, shouldExist := expectedStatefulSets.GetByName(actualSset.Name)
expectedReplicas := int32(0)
if shouldExist {
expectedReplicas = sset.GetReplicas(expectedSset)
}
switch {
case !expectedStatefulSetsNames.Has(actualSset.Name) && actualReplicas == 0 && expectedReplicas == 0:
// the StatefulSet should not exist, and currently has no replicas
// it is safe to delete
deletions = append(deletions, actualSset)
case expectedReplicas < actualReplicas:
// the StatefulSet should be downscaled
requestedDeletes := actualReplicas - expectedReplicas
allowedDeletes := downscaleFilter(ctx, &state, actualSset, requestedDeletes)
if allowedDeletes == 0 {
continue
}
downscales = append(downscales, ssetDownscale{
statefulSet: actualSset,
initialReplicas: actualReplicas,
targetReplicas: actualReplicas - allowedDeletes,
finalReplicas: expectedReplicas,
})
default:
// nothing to do
}
}
return downscales, deletions
}
type downscaleFilter func(_ context.Context, _ *downscaleState, _ appsv1.StatefulSet, _ int32) int32
// noDownscaleFilter is a filter which does not remove any Pod. It can be used to compute the full list of
// Pods which are expected to be deleted.
func noDownscaleFilter(_ context.Context, _ *downscaleState, _ appsv1.StatefulSet, requestedDeletes int32) int32 {
return requestedDeletes
}
// downscaleBudgetFilter is a filter which relies on checkDownscaleInvariants.
// It ensures that we only downscale nodes we're allowed to.
// Note that this function may have side effects on the downscaleState and should not be considered as idempotent.
func downscaleBudgetFilter(ctx context.Context, state *downscaleState, actualSset appsv1.StatefulSet, requestedDeletes int32) int32 {
allowedDeletes, reason := checkDownscaleInvariants(*state, actualSset, requestedDeletes)
if allowedDeletes == 0 {
ssetLogger(ctx, actualSset).V(1).Info("Cannot downscale StatefulSet", "reason", reason)
return 0
}
state.recordNodeRemoval(actualSset, allowedDeletes)
return allowedDeletes
}
// attemptDownscale attempts to decrement the number of replicas of the given StatefulSet.
// Nodes whose data migration is not over will not be removed.
// A boolean is returned to indicate if a requeue should be scheduled if the entire downscale could not be performed.
func attemptDownscale(
ctx downscaleContext,
downscale ssetDownscale,
statefulSets es_sset.StatefulSetList,
) (bool, error) {
// adjust the theoretical downscale to one we can safely perform
performable, err := calculatePerformableDownscale(ctx, downscale)
if err != nil {
return true, err
}
if performable.targetReplicas == performable.initialReplicas {
// no downscale can be performed for now, let's requeue
return true, nil
}
// do performable downscale, and requeue if needed
shouldRequeue := performable.targetReplicas != downscale.finalReplicas
return shouldRequeue, doDownscale(ctx, performable, statefulSets)
}
// deleteStatefulSetResources deletes the given StatefulSet along with the corresponding
// headless service, configuration and transport certificates secret.
func deleteStatefulSetResources(ctx context.Context, k8sClient k8s.Client, es esv1.Elasticsearch, statefulSet appsv1.StatefulSet) error {
headlessSvc := nodespec.HeadlessService(&es, statefulSet.Name)
err := k8sClient.Delete(ctx, &headlessSvc)
if err != nil && !apierrors.IsNotFound(err) {
return err
}
err = settings.DeleteConfig(ctx, k8sClient, es.Namespace, statefulSet.Name)
if err != nil && !apierrors.IsNotFound(err) {
return err
}
err = transport.DeleteStatefulSetTransportCertificate(ctx, k8sClient, es.Namespace, statefulSet.Name)
if err != nil && !apierrors.IsNotFound(err) {
return err
}
ssetLogger(ctx, statefulSet).Info("Deleting statefulset")
return k8sClient.Delete(ctx, &statefulSet)
}
// calculatePerformableDownscale updates the given downscale target replicas to account for nodes
// which cannot be safely deleted yet.
// It returns the updated downscale and a boolean indicating whether a requeue should be done.
func calculatePerformableDownscale(
ctx downscaleContext,
downscale ssetDownscale,
) (ssetDownscale, error) {
// create another downscale based on the provided one, for which we'll slowly decrease target replicas
performableDownscale := ssetDownscale{
statefulSet: downscale.statefulSet,
initialReplicas: downscale.initialReplicas,
targetReplicas: downscale.initialReplicas, // target set to initial
finalReplicas: downscale.finalReplicas,
}
// iterate on all leaving nodes (ordered by highest ordinal first)
for _, node := range downscale.leavingNodeNames() {
response, err := ctx.nodeShutdown.ShutdownStatus(ctx.parentCtx, node)
if err != nil {
return performableDownscale, fmt.Errorf("while checking shutdown status: %w", err)
}
switch response.Status {
case esclient.ShutdownComplete:
// shutdown including data migration over: allow pod to be removed
performableDownscale.targetReplicas--
case esclient.ShutdownStalled:
// shutdown stalled this can require user interaction: bubble up via event
ctx.reconcileState.
UpdateWithPhase(esv1.ElasticsearchNodeShutdownStalledPhase).
AddEvent(
corev1.EventTypeWarning,
events.EventReasonStalled,
fmt.Sprintf("Requested topology change is stalled. User intervention maybe required if this condition persists. %s", response.Explanation),
)
// no need to check other nodes since we remove them in order and this one isn't ready anyway
return performableDownscale, nil
case esclient.ShutdownInProgress:
ctx.reconcileState.
UpdateWithPhase(esv1.ElasticsearchMigratingDataPhase).
AddEvent(
corev1.EventTypeNormal,
events.EventReasonDelayed,
"Requested topology change delayed by data migration. Ensure index settings allow node removal.",
)
// no need to check other nodes since we remove them in order and this one isn't ready anyway
return performableDownscale, nil
case esclient.ShutdownNotStarted:
msg := fmt.Sprintf("Unexpected state. Node shutdown could not be started: %s", response.Explanation)
return performableDownscale, errors.New(msg)
}
}
return performableDownscale, nil
}
// doDownscale schedules nodes removal for the given downscale, and updates zen settings accordingly.
func doDownscale(downscaleCtx downscaleContext, downscale ssetDownscale, actualStatefulSets es_sset.StatefulSetList) error {
ssetLogger(downscaleCtx.parentCtx, downscale.statefulSet).Info(
"Scaling replicas down",
"from", downscale.initialReplicas,
"to", downscale.targetReplicas,
)
if label.IsMasterNodeSet(downscale.statefulSet) {
if err := updateZenSettingsForDownscale(
downscaleCtx.parentCtx,
downscaleCtx.k8sClient,
downscaleCtx.esClient,
downscaleCtx.es,
downscaleCtx.reconcileState,
actualStatefulSets,
downscale.leavingNodeNames()...,
); err != nil {
return err
}
}
nodespec.UpdateReplicas(&downscale.statefulSet, &downscale.targetReplicas)
if err := downscaleCtx.k8sClient.Update(downscaleCtx.parentCtx, &downscale.statefulSet); err != nil {
return err
}
// Expect the updated statefulset in the cache for next reconciliation.
downscaleCtx.expectations.ExpectGeneration(downscale.statefulSet)
return nil
}
// updateZenSettingsForDownscale makes sure zen1 and zen2 settings are updated to account for nodes
// that will soon be removed.
func updateZenSettingsForDownscale(
ctx context.Context,
c k8s.Client,
esClient esclient.Client,
es esv1.Elasticsearch,
reconcileState *reconcile.State,
actualStatefulSets es_sset.StatefulSetList,
excludeNodes ...string,
) error {
// Maybe update zen1 minimum_master_nodes.
if err := maybeUpdateZen1ForDownscale(ctx, c, esClient, es, reconcileState, actualStatefulSets); err != nil {
return err
}
// Maybe update zen2 settings to exclude leaving master nodes from voting.
return zen2.AddToVotingConfigExclusions(ctx, c, esClient, es, excludeNodes)
}
// maybeUpdateZen1ForDownscale updates zen1 minimum master nodes if we are downscaling from 2 to 1 master node.
func maybeUpdateZen1ForDownscale(
ctx context.Context,
c k8s.Client,
esClient esclient.Client,
es esv1.Elasticsearch,
reconcileState *reconcile.State,
actualStatefulSets es_sset.StatefulSetList) error {
// Check if we have at least one Zen1 compatible pod or StatefulSet in flight.
if zen1compatible, err := zen1.AtLeastOneNodeCompatibleWithZen1(ctx, actualStatefulSets, c, es); !zen1compatible || err != nil {
return err
}
actualMasters, err := es_sset.GetActualMastersForCluster(c, es)
if err != nil {
return err
}
if len(actualMasters) != 2 {
// not in the 2->1 situation
return nil
}
// We are moving from 2 to 1 master nodes, we need to update minimum_master_nodes before removing
// the 2nd node, otherwise the cluster won't be able to form anymore.
// This is inherently unsafe (can cause split brains), but there's no alternative.
// For other situations (eg. 3 -> 2), it's fine to update minimum_master_nodes after the node is removed
// (will be done at next reconciliation, before nodes removal).
reconcileState.AddEvent(
corev1.EventTypeWarning, events.EventReasonUnhealthy,
"Downscaling from 2 to 1 master nodes: unsafe operation",
)
minimumMasterNodes := 1
return zen1.UpdateMinimumMasterNodesTo(ctx, es, esClient, minimumMasterNodes)
}