pkg/controller/elasticsearch/driver/upgrade.go (364 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.
package driver
import (
"context"
"fmt"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
esv1 "github.com/elastic/cloud-on-k8s/v3/pkg/apis/elasticsearch/v1"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/expectations"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/reconciler"
sset "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/statefulset"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/version"
esclient "github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/client"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/label"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/nodespec"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/reconcile"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/shutdown"
es_sset "github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/sset"
"github.com/elastic/cloud-on-k8s/v3/pkg/utils/k8s"
ulog "github.com/elastic/cloud-on-k8s/v3/pkg/utils/log"
)
func (d *defaultDriver) handleUpgrades(
ctx context.Context,
esClient esclient.Client,
esState ESState,
expectedResources nodespec.ResourcesList,
) *reconciler.Results {
results := &reconciler.Results{}
log := ulog.FromContext(ctx)
// We need to check that all the expectations are satisfied before continuing.
// This is to be sure that none of the previous steps has changed the state and
// that we are not running with a stale cache.
ok, reason, err := d.expectationsSatisfied(ctx)
if err != nil {
return results.WithError(err)
}
if !ok {
reason := fmt.Sprintf("Nodes upgrade: %s", reason)
return results.WithReconciliationState(defaultRequeue.WithReason(reason))
}
// Get the pods to upgrade
statefulSets, err := es_sset.RetrieveActualStatefulSets(d.Client, k8s.ExtractNamespacedName(&d.ES))
if err != nil {
return results.WithError(err)
}
podsToUpgrade, err := podsToUpgrade(d.Client, statefulSets)
if err != nil {
return results.WithError(err)
}
// Get the healthy Pods (from a K8S point of view + in the ES cluster)
healthyPods, err := healthyPods(d.Client, statefulSets, esState)
if err != nil {
return results.WithError(err)
}
nodeNameToID, err := esState.NodeNameToID()
if err != nil {
results.WithError(err)
}
logger := log.WithValues("namespace", d.ES.Namespace, "es_name", d.ES.Name)
nodeShutdown := shutdown.NewNodeShutdown(esClient, nodeNameToID, esclient.Restart, d.ES.ResourceVersion, logger)
// Maybe re-enable shards allocation and delete shutdowns if upgraded nodes are back into the cluster.
if results.WithResults(d.maybeCompleteNodeUpgrades(ctx, esClient, esState, nodeShutdown)).HasError() {
return results
}
// Get the list of pods currently existing in the StatefulSetList
currentPods, err := statefulSets.GetActualPods(d.Client)
if err != nil {
return results.WithError(err)
}
expectedMasters := expectedResources.MasterNodesNames()
// Maybe upgrade some of the nodes.
upgrade := newUpgrade(
ctx,
d,
statefulSets,
expectedResources,
esClient,
esState,
nodeShutdown,
expectedMasters,
podsToUpgrade,
healthyPods,
currentPods,
)
var deletedPods []corev1.Pod
isVersionUpgrade, err := isVersionUpgrade(d.ES)
if err != nil {
return results.WithError(err)
}
shouldDoFullRestartUpgrade := isNonHACluster(currentPods, expectedMasters) && isVersionUpgrade
if shouldDoFullRestartUpgrade {
// unconditional full cluster upgrade
deletedPods, err = run(upgrade.DeleteAll)
} else {
// regular rolling upgrade
deletedPods, err = run(upgrade.Delete)
}
if err != nil {
return results.WithError(err)
}
if len(deletedPods) > 0 {
// Some Pods have just been deleted, we don't need to try to enable shards allocation.
return results.WithReconciliationState(defaultRequeue.WithReason("Nodes upgrade in progress"))
}
if len(podsToUpgrade) > len(deletedPods) {
// Some Pods have not been updated, ensure that we retry later
results.WithReconciliationState(defaultRequeue.WithReason("Nodes upgrade in progress"))
}
return results
}
type upgradeCtx struct {
parentCtx context.Context
client k8s.Client
ES esv1.Elasticsearch
resourcesList nodespec.ResourcesList
statefulSets es_sset.StatefulSetList
esClient esclient.Client
shardLister esclient.ShardLister
nodeShutdown *shutdown.NodeShutdown
esState ESState
expectations *expectations.Expectations
reconcileState *reconcile.State
expectedMasters []string
podsToUpgrade []corev1.Pod
healthyPods map[string]corev1.Pod
currentPods []corev1.Pod
}
func newUpgrade(
ctx context.Context,
d *defaultDriver,
statefulSets es_sset.StatefulSetList,
resourcesList nodespec.ResourcesList,
esClient esclient.Client,
esState ESState,
nodeShutdown *shutdown.NodeShutdown,
expectedMaster []string,
podsToUpgrade []corev1.Pod,
healthyPods map[string]corev1.Pod,
currentPods []corev1.Pod,
) upgradeCtx {
return upgradeCtx{
parentCtx: ctx,
client: d.Client,
ES: d.ES,
statefulSets: statefulSets,
resourcesList: resourcesList,
esClient: esClient,
shardLister: esClient,
nodeShutdown: nodeShutdown,
esState: esState,
expectations: d.Expectations,
reconcileState: d.ReconcileState,
expectedMasters: expectedMaster,
podsToUpgrade: podsToUpgrade,
healthyPods: healthyPods,
currentPods: currentPods,
}
}
func run(upgrade func() ([]corev1.Pod, error)) ([]corev1.Pod, error) {
deletedPods, err := upgrade()
if apierrors.IsConflict(err) || apierrors.IsNotFound(err) {
// Cache is not up to date or Pod has been deleted by someone else
// (could be the StatefulSet controller)
// TODO: should we at least log this one in debug mode ?
return deletedPods, nil
}
if err != nil {
return deletedPods, err
}
return deletedPods, nil
}
// isNonHACluster returns true if the expected and actual number of master nodes indicates that the quorum of that cluster
// does not allow the loss of any node in which case a regular rolling upgrade might not be possible especially when doing
// a major version upgrade.
func isNonHACluster(actualPods []corev1.Pod, expectedMasters []string) bool {
if len(expectedMasters) > 2 {
return false
}
actualMasters := label.FilterMasterNodePods(actualPods)
return len(actualMasters) <= 2
}
// isVersionUpgrade returns true if a spec change contains a version upgrade.
func isVersionUpgrade(es esv1.Elasticsearch) (bool, error) {
specVersion, err := version.Parse(es.Spec.Version)
if err != nil {
return false, err
}
statusVersion, err := version.Parse(es.Status.Version)
if err != nil {
return false, err
}
return specVersion.GT(statusVersion), nil
}
func healthyPods(
client k8s.Client,
statefulSets es_sset.StatefulSetList,
esState ESState,
) (map[string]corev1.Pod, error) {
healthyPods := make(map[string]corev1.Pod)
currentPods, err := statefulSets.GetActualPods(client)
if err != nil {
return nil, err
}
for _, pod := range currentPods {
if !pod.DeletionTimestamp.IsZero() || !k8s.IsPodReady(pod) {
continue
}
// has the node joined the cluster yet?
inCluster, err := esState.NodesInCluster([]string{pod.Name})
if err != nil {
return nil, err
}
if inCluster {
healthyPods[pod.Name] = pod
}
}
return healthyPods, nil
}
// podsToUpgrade returns all Pods of all StatefulSets where the controller-revision-hash label compared to the sset's
// .status.updateRevision indicates that the Pod still needs to be deleted to be recreated with the new spec.
func podsToUpgrade(
client k8s.Client,
statefulSets es_sset.StatefulSetList,
) ([]corev1.Pod, error) {
var toUpgrade []corev1.Pod
for _, statefulSet := range statefulSets {
if statefulSet.Status.UpdateRevision == "" {
// no upgrade scheduled
continue
}
// Inspect each pod, starting from the highest ordinal, and decrement the idx to allow
// pod upgrades to go through, controlled by the StatefulSet controller.
for idx := sset.GetReplicas(statefulSet) - 1; idx >= 0; idx-- {
// Do we need to upgrade that pod?
podName := sset.PodName(statefulSet.Name, idx)
podRef := types.NamespacedName{Namespace: statefulSet.Namespace, Name: podName}
// retrieve pod to inspect its revision label
var pod corev1.Pod
err := client.Get(context.Background(), podRef, &pod)
if err != nil && !apierrors.IsNotFound(err) {
return toUpgrade, err
}
if apierrors.IsNotFound(err) {
// Pod does not exist, continue the loop as the absence will be accounted by the deletion driver
continue
}
if sset.PodRevision(pod) != statefulSet.Status.UpdateRevision {
toUpgrade = append(toUpgrade, pod)
}
}
}
return toUpgrade, nil
}
func terminatingPodNames(client k8s.Client, statefulSets es_sset.StatefulSetList) ([]string, error) {
pods, err := statefulSets.GetActualPods(client)
if err != nil {
return nil, err
}
return k8s.PodNames(k8s.TerminatingPods(pods)), nil
}
func doFlush(ctx context.Context, es esv1.Elasticsearch, esClient esclient.Client) error {
log := ulog.FromContext(ctx)
targetEsVersion, err := version.Parse(es.Spec.Version)
if err != nil {
return err
}
switch {
case targetEsVersion.Major >= 8:
// Starting version 8.0, synced flush is not necessary anymore. A normal flush should be used instead.
// During an upgrade from 7.x to 8.x we may have at least one Pod running 8.x already,
// hence we check the target version here and not the currently running version.
// It's ok to run a standard flush before 8.x, just not as optimal.
log.Info("Requesting a flush", "es_name", es.Name, "namespace", es.Namespace)
return esClient.Flush(ctx)
default:
// Pre 8.0, we should perform a synced flush (best-effort).
log.Info("Requesting a synced flush", "es_name", es.Name, "namespace", es.Namespace)
err := esClient.SyncedFlush(ctx)
if esclient.IsConflict(err) {
// Elasticsearch returns an error if the synced flush fails due to concurrent indexing operations.
// The HTTP status code in that case will be 409 CONFLICT. We ignore that and consider synced flush best effort.
log.Info("synced flush failed with 409 CONFLICT. Ignoring.", "namespace", es.Namespace, "es_name", es.Name)
return nil
}
return err
}
}
func (d *defaultDriver) maybeCompleteNodeUpgrades(
ctx context.Context,
esClient esclient.Client,
esState ESState,
nodeShutdown *shutdown.NodeShutdown,
) *reconciler.Results {
results := &reconciler.Results{}
// Make sure all pods scheduled for upgrade have been upgraded.
// This is a redundant check in the current call hierarchy but makes the invariant explicit and testing easier.
done, reason, err := d.expectationsSatisfied(ctx)
if err != nil {
return results.WithError(err)
}
if !done {
reason := fmt.Sprintf("Completing node upgrade: %s", reason)
return results.WithReconciliationState(defaultRequeue.WithReason(reason))
}
statefulSets, err := es_sset.RetrieveActualStatefulSets(d.Client, k8s.ExtractNamespacedName(&d.ES))
if err != nil {
return results.WithError(err)
}
// Also make sure that when cleaning up node shutdowns we don't remove shutdown records for terminating Pods.
// The expectation mechanism covers planned spec changes for Pods. However, Pods might also be deleted due to external factors
// like Kubernetes node upgrades or manual admin intervention. We orchestrate node shutdown in these cases via a pre-stop hook
// and don't want to interrupt that process until it completes. This a best-effort attempt as observation of shutdown records in
// Elasticsearch and Pod deletion might not be in sync due to cache lag.
terminating, err := terminatingPodNames(d.Client, statefulSets)
if err != nil {
return results.WithError(err)
}
// once expectations are satisfied we can already delete shutdowns that are complete and where the node
// is back in the cluster to avoid completed shutdowns from accumulating and affecting node availability calculations
// in Elasticsearch for example for indices with `auto_expand_replicas` setting.
if supportsNodeShutdown(esClient.Version()) {
// clear all shutdowns of type restart that have completed
results = results.WithError(nodeShutdown.Clear(ctx,
esclient.ShutdownComplete.Applies,
nodeShutdown.OnlyNodesInCluster,
nodeShutdown.OnlyNonTerminatingNodes(terminating),
))
}
// Make sure all nodes scheduled for upgrade are back into the cluster.
nodesInCluster, err := esState.NodesInCluster(statefulSets.PodNames())
if err != nil {
return results.WithError(err)
}
if !nodesInCluster {
ulog.FromContext(ctx).V(1).Info(
"Some upgraded nodes are not back in the cluster yet, cannot complete node upgrade",
"namespace", d.ES.Namespace,
"es_name", d.ES.Name,
)
return results.WithReconciliationState(defaultRequeue.WithReason("Nodes upgrade: some nodes are not back in the cluster yet"))
}
// we still have to enable shard allocation in cases where we just upgraded from
// a version that did not support node shutdown to a supported version.
results = results.WithResults(d.maybeEnableShardsAllocation(ctx, esClient, esState))
if supportsNodeShutdown(esClient.Version()) {
// clear all shutdowns of type restart that have completed including those where the node is no longer in the cluster
// or node state was lost due to an external event
results = results.WithError(nodeShutdown.Clear(ctx,
esclient.ShutdownComplete.Applies,
nodeShutdown.OnlyNonTerminatingNodes(terminating),
))
}
return results
}
func (d *defaultDriver) maybeEnableShardsAllocation(
ctx context.Context,
esClient esclient.Client,
esState ESState,
) *reconciler.Results {
results := &reconciler.Results{}
// we are fully migrated to node shutdown and do not need this logic anymore
if d.ReconcileState.OrchestrationHints().NoTransientSettings {
return results
}
alreadyEnabled, err := esState.ShardAllocationsEnabled()
if err != nil {
return results.WithError(err)
}
if alreadyEnabled {
return results
}
ulog.FromContext(ctx).Info("Enabling shards allocation", "namespace", d.ES.Namespace, "es_name", d.ES.Name)
if err := esClient.EnableShardAllocation(ctx); err != nil {
return results.WithError(err)
}
return results
}
func (ctx *upgradeCtx) readyToDelete(pod corev1.Pod) (bool, error) {
if !supportsNodeShutdown(ctx.esClient.Version()) {
return true, nil // always OK to restart pre node shutdown support
}
if !k8s.IsPodReady(pod) {
// there is no point in trying to query the shutdown status of a Pod that is not ready
return true, nil
}
response, err := ctx.nodeShutdown.ShutdownStatus(ctx.parentCtx, pod.Name)
if err != nil {
return false, err
}
return response.Status == esclient.ShutdownComplete, nil
}
func (ctx *upgradeCtx) requestNodeRestarts(podsToRestart []corev1.Pod) error {
var podNames []string //nolint:prealloc
for _, p := range podsToRestart {
if !k8s.IsPodReady(p) {
// There is no point in trying to shut down a Pod that is not running.
// Basing this off of the cached Kubernetes client's world view opens up a few edge
// cases where a Pod might in fact already be running but the client's cache is not yet
// up to date. But the trade-off made here i.e. accepting an ungraceful shutdown in these
// edge case vs. being able to automatically unblock configuration rollouts that are blocked
// due to misconfiguration, for example unfulfillable node selectors, seems worth it.
continue
}
podNames = append(podNames, p.Name)
}
// Note that ReconcileShutdowns would cancel ongoing shutdowns when called with no podNames
// this is however not the case in the rolling upgrade logic where we exit early if no pod needs to be rotated.
return ctx.nodeShutdown.ReconcileShutdowns(ctx.parentCtx, podNames, k8s.PodNames(k8s.TerminatingPods(ctx.currentPods)))
}
func (ctx *upgradeCtx) prepareClusterForNodeRestart(podsToUpgrade []corev1.Pod) error {
// use client.Version here as we want the minimal version in the cluster not the one in the spec.
if supportsNodeShutdown(ctx.esClient.Version()) {
return ctx.requestNodeRestarts(podsToUpgrade)
}
// Disable shard allocations to avoid shards moving around while the node is temporarily down
shardsAllocationEnabled, err := ctx.esState.ShardAllocationsEnabled()
if err != nil {
return err
}
if shardsAllocationEnabled {
ulog.FromContext(ctx.parentCtx).Info("Disabling shards allocation", "es_name", ctx.ES.Name, "namespace", ctx.ES.Namespace)
if err := ctx.esClient.DisableReplicaShardsAllocation(ctx.parentCtx); err != nil {
return err
}
}
// Request a flush to optimize indices recovery when the node restarts.
return doFlush(ctx.parentCtx, ctx.ES, ctx.esClient)
}