pkg/controller/elasticsearch/version/zen2/initial_master_nodes.go (137 lines of code) (raw):

// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License 2.0; // you may not use this file except in compliance with the Elastic License 2.0. package zen2 import ( "context" "strings" pkgerrors "github.com/pkg/errors" esv1 "github.com/elastic/cloud-on-k8s/v3/pkg/apis/elasticsearch/v1" sset "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/statefulset" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/version" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/bootstrap" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/client" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/label" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/nodespec" es_sset "github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/sset" "github.com/elastic/cloud-on-k8s/v3/pkg/utils/k8s" ulog "github.com/elastic/cloud-on-k8s/v3/pkg/utils/log" "github.com/elastic/cloud-on-k8s/v3/pkg/utils/set" ) const ( // InitialMasterNodesAnnotation is applied on the Elasticsearch resource while a cluster is // bootstrapping zen2, and removed when bootstrapping is done. initialMasterNodesAnnotation = "elasticsearch.k8s.elastic.co/initial-master-nodes" ) // SetupInitialMasterNodes sets the `cluster.initial_master_nodes` configuration setting on // zen2-compatible master nodes from nodeSpecResources if necessary. // This is only necessary when bootstrapping a new zen2 cluster, or when upgrading a single zen1 master. // Rolling upgrades from eg. v6 to v7 do not need that setting. // It ensures `cluster.initial_master_nodes` does not vary over time, when this function gets called multiple times. func SetupInitialMasterNodes(ctx context.Context, es esv1.Elasticsearch, k8sClient k8s.Client, nodeSpecResources nodespec.ResourcesList) error { // if the cluster is annotated with `cluster.initial_master_nodes` (zen2 bootstrap in progress), // make sure we reuse that value since it is not supposed to vary over time if initialMasterNodes := getInitialMasterNodesAnnotation(es); initialMasterNodes != nil { return patchInitialMasterNodesConfig(ctx, nodeSpecResources, initialMasterNodes) } // in most cases, `cluster.initial_master_nodes` should not be set shouldSetup, err := shouldSetInitialMasterNodes(es, k8sClient, nodeSpecResources) if err != nil { return err } if !shouldSetup { return nil } initialMasterNodes := nodeSpecResources.MasterNodesNames() if len(initialMasterNodes) == 0 { return pkgerrors.Errorf("no master node found to compute `cluster.initial_master_nodes`") } ulog.FromContext(ctx).Info( "Setting `cluster.initial_master_nodes`", "namespace", es.Namespace, "es_name", es.Name, "cluster.initial_master_nodes", strings.Join(initialMasterNodes, ","), ) if err := patchInitialMasterNodesConfig(ctx, nodeSpecResources, initialMasterNodes); err != nil { return err } // keep the computed value in an annotation for reuse in subsequent reconciliations return setInitialMasterNodesAnnotation(ctx, k8sClient, es, initialMasterNodes) } func shouldSetInitialMasterNodes(es esv1.Elasticsearch, k8sClient k8s.Client, nodeSpecResources nodespec.ResourcesList) (bool, error) { if v, err := version.Parse(es.Spec.Version); err != nil || !versionCompatibleWithZen2(v) { // we only care about zen2-compatible clusters here return false, err } // we want to set `cluster.initial_master_nodes` if: // - a new cluster is getting created (not already bootstrapped) if !bootstrap.AnnotatedForBootstrap(es) { return true, nil } // - we're upgrading (effectively restarting) a non-HA zen1 cluster to zen2 return nonHAZen1MasterUpgrade(k8sClient, es, nodeSpecResources) } // RemoveZen2BootstrapAnnotation removes the initialMasterNodesAnnotation (if set) once zen2 is bootstrapped // on the corresponding cluster. func RemoveZen2BootstrapAnnotation(ctx context.Context, k8sClient k8s.Client, es esv1.Elasticsearch, esClient client.Client) (bool, error) { if v, err := version.Parse(es.Spec.Version); err != nil || !versionCompatibleWithZen2(v) { // we only care about zen2-compatible clusters here return false, err } if getInitialMasterNodesAnnotation(es) == nil { // most common case: no annotation set, nothing to do return false, nil } // the cluster was annotated to indicate it is performing a zen2 bootstrap, // let's check if that bootstrap is done so we can remove the annotation isBootstrapped, err := esClient.ClusterBootstrappedForZen2(ctx) if err != nil { return false, err } if !isBootstrapped { // retry later return true, nil } ulog.FromContext(ctx).Info("Zen 2 bootstrap is complete", "namespace", es.Namespace, "es_name", es.Name, ) // remove the annotation to indicate we're done with zen2 bootstrapping delete(es.Annotations, initialMasterNodesAnnotation) return false, k8sClient.Update(ctx, &es) } // patchInitialMasterNodesConfig mutates the configuration of zen2-compatible master nodes // to have the given `cluster.initial_master_nodes` setting. func patchInitialMasterNodesConfig(ctx context.Context, nodeSpecResources nodespec.ResourcesList, initialMasterNodes []string) error { for i, res := range nodeSpecResources { if !label.IsMasterNodeSet(res.StatefulSet) || !IsCompatibleWithZen2(ctx, res.StatefulSet) { // we only care about updating zen2 masters config here continue } if err := nodeSpecResources[i].Config.SetStrings(esv1.ClusterInitialMasterNodes, initialMasterNodes...); err != nil { return err } } return nil } // nonHAZen1MasterUpgrade returns true if expected nodes in nodeSpecResources will lead to upgrading // the one or two zen1-compatible master nodes currently running in the es cluster. // As we upgrade all nodes at once in one or two node clusters initial master nodes needs to be set as there is no // existing cluster to join once all v6 nodes have been terminated. func nonHAZen1MasterUpgrade(c k8s.Client, es esv1.Elasticsearch, nodeSpecResources nodespec.ResourcesList) (bool, error) { // looking for a non-HA master node setup... masters, err := es_sset.GetActualMastersForCluster(c, es) if err != nil { return false, err } if len(masters) > 2 { return false, nil } currentMasterNames := set.Make() for _, currentMaster := range masters { currentMasterNames.Add(currentMaster.Name) // ...not compatible with zen2... v, err := label.ExtractVersion(currentMaster.Labels) if err != nil { return false, err } // at least one master is already on Zen 2 if versionCompatibleWithZen2(v) { return false, nil } } // ...that will be replaced targetMasters := set.Make() for _, res := range nodeSpecResources { if label.IsMasterNodeSet(res.StatefulSet) { targetMasters.MergeWith(set.Make(sset.PodNames(res.StatefulSet)...)) } } if targetMasters.Count() == 0 { return false, nil } if targetMasters.Count() > 2 { // Covers the case where the user is upgrading to zen2 + adding more masters simultaneously. // Additional masters will get created before the existing one gets upgraded/restarted. return false, nil } if currentMasterNames.Diff(targetMasters).Count() > 0 { // Covers the case where the existing masters are replaced by other masters in a different NodeSet. // The new master will be created before the existing one gets removed. return false, nil } // one or two zen1 masters, will be replaced by a one or two zen2 master with the same name return true, nil } // getInitialMasterNodesAnnotation parses the `cluster.initial_master_nodes` value from // annotations on es, or returns nil if not set. func getInitialMasterNodesAnnotation(es esv1.Elasticsearch) []string { var nodes []string if value := es.Annotations[initialMasterNodesAnnotation]; value != "" { nodes = strings.Split(value, ",") } return nodes } // setInitialMasterNodesAnnotation sets initialMasterNodesAnnotation on the given es resource to initialMasterNodes, // and updates the es resource in the apiserver. func setInitialMasterNodesAnnotation(ctx context.Context, k8sClient k8s.Client, es esv1.Elasticsearch, initialMasterNodes []string) error { if es.Annotations == nil { es.Annotations = map[string]string{} } es.Annotations[initialMasterNodesAnnotation] = strings.Join(initialMasterNodes, ",") return k8sClient.Update(ctx, &es) }