pkg/controller/elasticsearch/driver/upgrade_predicates.go (511 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.
package driver
import (
"context"
"sort"
corev1 "k8s.io/api/core/v1"
esv1 "github.com/elastic/cloud-on-k8s/v3/pkg/apis/elasticsearch/v1"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/labels"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/version"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/client"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/label"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/nodespec"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/reconcile"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/sset"
ulog "github.com/elastic/cloud-on-k8s/v3/pkg/utils/log"
"github.com/elastic/cloud-on-k8s/v3/pkg/utils/stringsutil"
)
// getNodeSettings returns the node settings for a given Pod.
func getNodeSettings(
version version.Version,
resourcesList nodespec.ResourcesList,
pod corev1.Pod,
) (esv1.ElasticsearchSettings, error) {
// Get the expected configuration
statefulSetName, _, err := sset.StatefulSetName(pod.Name)
if err != nil {
return esv1.ElasticsearchSettings{}, err
}
resources, err := resourcesList.ForStatefulSet(statefulSetName)
if err != nil {
return esv1.ElasticsearchSettings{}, err
}
nodeCfg, err := resources.Config.Unpack(version)
if err != nil {
return esv1.ElasticsearchSettings{}, err
}
return nodeCfg, nil
}
// getNodesSettings returns the node settings for a given list of Pods.
func getNodesSettings(
version version.Version,
resourcesList nodespec.ResourcesList,
pods ...corev1.Pod,
) ([]esv1.ElasticsearchSettings, error) {
rolesList := make([]esv1.ElasticsearchSettings, len(pods))
for i := range pods {
roles, err := getNodeSettings(version, resourcesList, pods[i])
if err != nil {
return nil, err
}
rolesList[i] = roles
}
return rolesList, nil
}
// hasDependencyInOthers returns true if, for a given node, at least one other node in a slice can be considered as a
// strong dependency and must be upgraded first. A strong dependency is a unidirectional dependency, if a circular
// dependency exists between two nodes the dependency is not considered as a strong one.
func hasDependencyInOthers(node esv1.ElasticsearchSettings, others []esv1.ElasticsearchSettings) bool {
if !node.Node.CanContainData() {
// node has no tier which requires upgrade prioritization.
return false
}
for _, other := range others {
if !other.Node.CanContainData() {
// this other node has no tier which requires upgrade prioritization.
continue
}
if node.Node.DependsOn(other.Node) && !other.Node.DependsOn(node.Node) {
// candidate has this other node as a strict dependency
return true
}
}
// no dependency or roles are overlapping, we still allow the upgrade
return false
}
// PredicateContext is the set of fields used while determining what set of pods
// can be upgraded when performing a rolling upgrade on an Elasticsearch cluster.
type PredicateContext struct {
es esv1.Elasticsearch
// expected resources (sset, service, config) by StatefulSet
resourcesList nodespec.ResourcesList
expectedMasterNodesNames []string
// healthy Pods are "running" as per k8s API and have joined the ES cluster
healthyPods map[string]corev1.Pod
// Pods based on outdated spec
toUpdate []corev1.Pod
esState ESState
shardLister client.ShardLister
masterUpdateInProgress bool
ctx context.Context
// all Pods for the existing StatefulSets from k8s API
currentPods []corev1.Pod
}
// Predicate is a function that indicates if a Pod can be deleted (or not).
type Predicate struct {
name string
fn func(context PredicateContext, candidate corev1.Pod, deletedPods []corev1.Pod, maxUnavailableReached bool) (bool, error)
}
type failedPredicate struct {
pod string
predicate string
}
type failedPredicates map[string]string
// groupByPredicates groups by predicates the pods that can't be upgraded.
func groupByPredicates(fp failedPredicates) map[string][]string {
podsByPredicates := make(map[string][]string)
for pod, predicate := range fp {
pods := podsByPredicates[predicate]
pods = append(pods, pod)
podsByPredicates[predicate] = pods
}
// Sort pods for stable comparison
for _, pods := range podsByPredicates {
sort.Strings(pods)
}
return podsByPredicates
}
// NewPredicateContext returns a new predicate context for use when
// processing an Elasticsearch rolling upgrade.
func NewPredicateContext(
ctx context.Context,
es esv1.Elasticsearch,
resourcesList nodespec.ResourcesList,
state ESState,
shardLister client.ShardLister,
healthyPods map[string]corev1.Pod,
podsToUpgrade []corev1.Pod,
masterNodesNames []string,
currentPods []corev1.Pod,
) PredicateContext {
return PredicateContext{
es: es,
resourcesList: resourcesList,
expectedMasterNodesNames: masterNodesNames,
healthyPods: healthyPods,
toUpdate: podsToUpgrade,
esState: state,
shardLister: shardLister,
ctx: ctx,
currentPods: currentPods,
}
}
func applyPredicates(
ctx PredicateContext,
candidates []corev1.Pod,
maxUnavailableReached bool,
allowedDeletions int,
reconcileState *reconcile.State,
) (deletedPods []corev1.Pod, err error) {
failedPredicates := make(failedPredicates)
Loop:
for _, candidate := range candidates {
switch predicateErr, err := runPredicates(ctx, candidate, deletedPods, maxUnavailableReached); {
case err != nil:
return deletedPods, err
case predicateErr != nil:
// A predicate has failed on this Pod
failedPredicates[predicateErr.pod] = predicateErr.predicate
default:
candidate := candidate
if label.IsMasterNode(candidate) || willBecomeMasterNode(candidate.Name, ctx.expectedMasterNodesNames) {
// It is a mutation on an already existing or future master.
ctx.masterUpdateInProgress = true
}
// Remove from healthy nodes if it was there
delete(ctx.healthyPods, candidate.Name)
// Append to the deletedPods list
deletedPods = append(deletedPods, candidate)
allowedDeletions--
if allowedDeletions <= 0 {
break Loop
}
}
}
// If some predicates have failed print a summary of the failures to help
// the user to understand why.
groupByPredicates := groupByPredicates(failedPredicates)
if len(failedPredicates) > 0 {
ulog.FromContext(ctx.ctx).Info(
"Cannot restart some nodes for upgrade at this time",
"namespace", ctx.es.Namespace,
"es_name", ctx.es.Name,
"failed_predicates", groupByPredicates)
}
// Also report in the status
reconcileState.RecordPredicatesResult(failedPredicates)
return deletedPods, nil
}
var predicates = [...]Predicate{
{
name: "data_tier_with_higher_priority_must_be_upgraded_first",
fn: func(
context PredicateContext,
candidate corev1.Pod,
deletedPods []corev1.Pod,
_ bool,
) (b bool, e error) {
if candidate.Labels[label.VersionLabelName] == context.es.Spec.Version {
// This predicate is only relevant during version upgrade.
return true, nil
}
currentVersion, err := label.ExtractVersion(candidate.Labels)
if err != nil {
return false, err
}
if currentVersion.LT(version.From(7, 10, 0)) {
// This predicate is only valid for an Elasticsearch node handling data tiers.
return true, nil
}
expectedVersion, err := version.Parse(context.es.Spec.Version)
if err != nil {
return false, err
}
// Get roles for the candidate.
candidateRoles, err := getNodeSettings(expectedVersion, context.resourcesList, candidate)
if err != nil {
return false, err
}
// Get all the roles from the Pods to be upgraded, including the ones already scheduled for an upgrade:
// the intent is to upgrade all the nodes with a same priority before moving on to a tier with a lower priority.
allPods := append(context.toUpdate, deletedPods...)
otherRoles, err := getNodesSettings(expectedVersion, context.resourcesList, allPods...)
if err != nil {
return false, err
}
if hasDependencyInOthers(candidateRoles, otherRoles) {
return false, err
}
return true, nil
},
},
{
// If MaxUnavailable is reached, only allow unhealthy Pods to be deleted.
// This is to prevent a situation where MaxUnavailable is reached and we
// can't make progress even if the user has updated the spec.
name: "do_not_restart_healthy_node_if_MaxUnavailable_reached",
fn: func(
context PredicateContext,
candidate corev1.Pod,
_ []corev1.Pod,
maxUnavailableReached bool,
) (b bool, e error) {
_, healthy := context.healthyPods[candidate.Name]
if maxUnavailableReached && healthy {
return false, nil
}
return true, nil
},
},
{
name: "skip_already_terminating_pods",
fn: func(
context PredicateContext,
candidate corev1.Pod,
deletedPods []corev1.Pod,
_ bool,
) (b bool, e error) {
if candidate.DeletionTimestamp != nil {
// Pod is already terminating, skip it
return false, nil
}
return true, nil
},
},
{
// If health is not Green or Yellow only allow unhealthy Pods to be restarted.
// This is intended to unlock some situations where the cluster is not green and
// a Pod has to be restarted a second time.
name: "only_restart_healthy_node_if_green_or_yellow",
fn: func(
context PredicateContext,
candidate corev1.Pod,
_ []corev1.Pod,
_ bool,
) (b bool, e error) {
// Cluster health is retrieved only once from the cluster.
// We rely on "shard conflict" predicate to avoid to delete two ES nodes that share some shards.
health, err := context.esState.Health()
if err != nil {
return false, err
}
if health.Status == esv1.ElasticsearchGreenHealth || health.Status == esv1.ElasticsearchYellowHealth {
return true, nil
}
_, healthy := context.healthyPods[candidate.Name]
if !healthy {
return true, nil
}
return false, nil
},
},
{
// During a rolling upgrade, primary shards assigned to a node running a new version cannot have their
// replicas assigned to a node with the old version. Therefore we must allow some Pods to be restarted
// even if cluster health is Yellow so the replicas can be assigned.
// This predicate checks that the following conditions are met for a candidate:
// * A cluster upgrade is in progress and the candidate version is not up to date
// * All primaries are assigned, only replicas are actually not assigned
// * There are no initializing or relocating shards
// See https://github.com/elastic/cloud-on-k8s/issues/1643
name: "if_yellow_only_restart_upgrading_nodes_with_unassigned_replicas",
fn: func(
context PredicateContext,
candidate corev1.Pod,
_ []corev1.Pod,
_ bool,
) (b bool, e error) {
health, err := context.esState.Health()
if err != nil {
return false, err
}
_, healthyNode := context.healthyPods[candidate.Name]
if health.Status != esv1.ElasticsearchYellowHealth || !healthyNode {
// This predicate is only relevant on healthy node if cluster health is yellow
return true, nil
}
if len(context.currentPods) == 1 {
// If the cluster is a single node cluster, allow restart even if there is no version difference
return true, nil
}
version := candidate.Labels[label.VersionLabelName]
if version == context.es.Spec.Version {
// Restart in yellow state is only allowed during version upgrade
return false, nil
}
// This candidate needs a version upgrade, check if the primaries are assigned and shards are not moving or
// initializing
return isSafeToRoll(health), nil
},
},
{
// We may need to delete nodes in a yellow cluster, but not if they contain the only replica
// of a shard since it would make the cluster go red.
name: "require_started_replica",
fn: func(
context PredicateContext,
candidate corev1.Pod,
_ []corev1.Pod,
_ bool,
) (b bool, e error) {
health, err := context.esState.Health()
if err != nil {
return false, err
}
_, healthyNode := context.healthyPods[candidate.Name]
if len(context.currentPods) == 1 && health.Status == esv1.ElasticsearchYellowHealth && healthyNode {
// If the cluster is a healthy single node cluster, replicas can not be started, allow the upgrade
return true, nil
}
allShards, err := context.shardLister.GetShards(context.ctx)
if err != nil {
return false, err
}
// We maintain two data structures to record:
// * The total number of replicas for a shard
// * How many of them are STARTED
startedReplicas := make(map[string]int)
replicas := make(map[string]int)
for _, shard := range allShards {
if shard.NodeName == candidate.Name {
continue
}
shardKey := shard.Key()
replicas[shardKey]++
if shard.State == client.STARTED {
startedReplicas[shardKey]++
}
}
// Do not delete a node with a Primary if there is not at least one STARTED replica
shardsByNode := allShards.GetShardsByNode()
shardsOnCandidate := shardsByNode[candidate.Name]
for _, shard := range shardsOnCandidate {
if !shard.IsPrimary() {
continue
}
shardKey := shard.Key()
numReplicas := replicas[shardKey]
assignedReplica := startedReplicas[shardKey]
// We accept here that there will be some unavailability if an index is configured with zero replicas
if numReplicas > 0 && assignedReplica == 0 {
// If this node is deleted there will be no more shards available
return false, nil
}
}
return true, nil
},
},
{
// One master at a time
name: "one_master_at_a_time",
fn: func(
context PredicateContext,
candidate corev1.Pod,
_ []corev1.Pod,
_ bool,
) (b bool, e error) {
// If candidate is not a master then we just check if it will become a master
// In this case we account for a master creation as we want to avoid creating more
// than one master at a time.
if !label.IsMasterNode(candidate) {
if willBecomeMasterNode(candidate.Name, context.expectedMasterNodesNames) {
return !context.masterUpdateInProgress, nil
}
// It is just a data node and it will not become a master: we don't care
return true, nil
}
// There is a current master scheduled for deletion
if context.masterUpdateInProgress {
return false, nil
}
// If candidate is already a master and is not healthy we want to give it a chance to restart anyway
// even if it is leaving the control plane.
_, healthy := context.healthyPods[candidate.Name]
if !healthy {
return true, nil
}
// If Pod is not an expected master it means that we are downscaling the masters
// by changing the type of the node.
// In this case we still check that other masters are healthy to avoid degrading the situation.
if !willBecomeMasterNode(candidate.Name, context.expectedMasterNodesNames) {
// We still need to ensure that others masters are healthy
for _, actualMaster := range label.FilterMasterNodePods(context.currentPods) {
_, healthyMaster := context.healthyPods[actualMaster.Name]
if !healthyMaster {
ulog.FromContext(context.ctx).V(1).Info(
"Can't permanently remove a master in a rolling upgrade if there is an other unhealthy master",
"namespace", candidate.Namespace,
"candidate", candidate.Name,
"unhealthy", actualMaster.Name,
)
return false, nil
}
}
return true, nil
}
// Get the expected masters
expectedMasters := len(context.expectedMasterNodesNames)
// Get the healthy masters
healthyMasters := 0
for _, pod := range context.healthyPods {
if label.IsMasterNode(pod) {
healthyMasters++
}
}
// We are relying here on the expectations and on the checks above that give us
// the guarantee that there is no upscale or downscale in progress.
// The condition to update an existing master is to have all the masters in a healthy state.
if healthyMasters == expectedMasters {
return true, nil
}
ulog.FromContext(context.ctx).V(1).Info(
"Cannot delete master for rolling upgrade",
"expected_healthy_masters", expectedMasters,
"actually_healthy_masters", healthyMasters,
)
return false, nil
},
},
{
// Force an upgrade of all the master-ineligible nodes before upgrading the last master-eligible
name: "do_not_delete_last_master_if_all_master_ineligible_nodes_are_not_upgraded",
fn: func(
context PredicateContext,
candidate corev1.Pod,
_ []corev1.Pod,
_ bool,
) (b bool, e error) {
// If candidate is not a master then we don't care
if !label.IsMasterNode(candidate) {
return true, nil
}
for _, pod := range context.toUpdate {
if candidate.Name == pod.Name {
continue
}
if label.IsMasterNode(pod) {
// There are some other masters to upgrades, allow this one to be deleted
return true, nil
}
}
// This is the last master, check if all master-ineligible nodes are up-to-date
for _, pod := range context.toUpdate {
if candidate.Name == pod.Name {
continue
}
if !label.IsMasterNode(pod) {
// There's still some master-ineligible nodes to update
return false, nil
}
}
return true, nil
},
},
{
// We should not delete 2 Pods with the same shards
name: "do_not_delete_pods_with_same_shards",
fn: func(
context PredicateContext,
candidate corev1.Pod,
deletedPods []corev1.Pod,
_ bool,
) (b bool, e error) {
if len(deletedPods) == 0 {
// Do not do unnecessary request
return true, nil
}
shards, err := context.shardLister.GetShards(context.ctx)
if err != nil {
return true, err
}
shardsByNode := shards.GetShardsByNode()
shardsOnCandidate, ok := shardsByNode[candidate.Name]
if !ok {
// No shards on this node
return true, nil
}
for _, deletedPod := range deletedPods {
shardsOnDeletedPod, ok := shardsByNode[deletedPod.Name]
if !ok {
// No shards on the deleted pod
continue
}
if conflictingShards(shardsOnCandidate, shardsOnDeletedPod) {
return false, nil
}
}
return true, nil
},
},
{
name: "do_not_delete_all_members_of_a_tier",
fn: func(
context PredicateContext,
candidate corev1.Pod,
_ []corev1.Pod,
_ bool,
) (bool, error) {
if _, exists := context.healthyPods[candidate.Name]; !exists {
// there is no point in keeping an unhealthy Pod as it does not contribute to tier availability
return true, nil
}
healthyPodRoleHisto := map[labels.TrueFalseLabel]int{}
currentPodRoleHisto := map[labels.TrueFalseLabel]int{}
// look at the current pods because we want to prevent taking away from current capacity for a tier
// not future capacity as per the expected Pod definitions expressed in the resourcesList
for _, pod := range context.currentPods {
// ignore voting_only and master we are handling those in dedicated predicates
forEachNonMasterRole(pod, func(role labels.TrueFalseLabel) {
currentPodRoleHisto[role]++
})
}
// look at the healthy Pods excluding the candidate (which is part of this list)
// this allows us to figure out what would be the remaining Pods assuming we remove the candidate
for _, pod := range context.healthyPods {
if pod.Name == candidate.Name {
continue
}
forEachNonMasterRole(pod, func(role labels.TrueFalseLabel) {
healthyPodRoleHisto[role]++
})
}
for _, role := range label.NonMasterRoles {
if role.HasValue(true, candidate.Labels) {
healthy := healthyPodRoleHisto[role]
current := currentPodRoleHisto[role]
if current == 1 {
// only Pod with this role: OK to delete
continue
}
if healthy <= 0 {
ulog.FromContext(context.ctx).V(1).Info(
"Delaying upgrade for Pod to ensure tier availability",
"node_role", role,
"namespace", candidate.Namespace,
"candidate", candidate.Name,
"healthy_pods_with_role", healthy,
)
return false, nil
}
}
}
return true, nil
},
},
}
func forEachNonMasterRole(pod corev1.Pod, f func(falseLabel labels.TrueFalseLabel)) {
for _, role := range label.NonMasterRoles {
if role.HasValue(true, pod.Labels) {
f(role)
}
}
}
func willBecomeMasterNode(name string, masters []string) bool {
return stringsutil.StringInSlice(name, masters)
}
func conflictingShards(shards1, shards2 []client.Shard) bool {
for _, shards1 := range shards1 {
for _, shards2 := range shards2 {
if shards1.Index == shards2.Index && shards1.Shard == shards2.Shard {
return true
}
}
}
return false
}
// IsSafeToRoll indicates that a rolling update can continue with the next node if
// - no relocating or initializing shards or shards being fetched
// - all primaries allocated
// only reliable if Status result was created with wait_for_events=languid
// so that there are no pending initialisations in the task queue
func isSafeToRoll(health client.Health) bool {
return !health.HasShardActivity() && // no shard activity
health.Status != esv1.ElasticsearchRedHealth // all primaries allocated
}