pkg/controller/elasticsearch/sset/list.go (167 lines of code) (raw):

// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License 2.0; // you may not use this file except in compliance with the Elastic License 2.0. package sset import ( "context" "fmt" "sort" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" sset "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/statefulset" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/version" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/label" "github.com/elastic/cloud-on-k8s/v3/pkg/utils/k8s" ulog "github.com/elastic/cloud-on-k8s/v3/pkg/utils/log" "github.com/elastic/cloud-on-k8s/v3/pkg/utils/set" ) type StatefulSetList []appsv1.StatefulSet // RetrieveActualStatefulSets returns the list of existing StatefulSets labeled for the given es cluster. // It is sorted using a natural order sort so that algorithms which are using the resulting list are more predictable and stable. func RetrieveActualStatefulSets(c k8s.Client, es types.NamespacedName) (StatefulSetList, error) { var ssets appsv1.StatefulSetList ns := client.InNamespace(es.Namespace) matchLabels := label.NewLabelSelectorForElasticsearchClusterName(es.Name) err := c.List(context.Background(), &ssets, ns, matchLabels) sort.Slice(ssets.Items, func(i, j int) bool { return ssets.Items[i].Name < ssets.Items[j].Name }) return StatefulSetList(ssets.Items), err } // GetByName returns the StatefulSet with the given name, and a bool indicating if the StatefulSet was found. func (l StatefulSetList) GetByName(ssetName string) (appsv1.StatefulSet, bool) { for _, sset := range l { if sset.Name == ssetName { return sset, true } } return appsv1.StatefulSet{}, false } // Names returns the set of StatefulSets names. func (l StatefulSetList) Names() set.StringSet { names := set.Make() for _, statefulSet := range l { names.Add(statefulSet.Name) } return names } // ToUpdate filters the StatefulSetList to the ones having an update revision scheduled. func (l StatefulSetList) ToUpdate() StatefulSetList { toUpdate := StatefulSetList{} for _, s := range l { // When using an OnDelete strategy current revision is never reset to update revision. // Just looking that the revision to detect updates does therefore does not work when reverting // to a previous revision and gives constant false positives after an initial update. // Only updated replicas != replicas expresses the fact that an update is still pending. if s.Status.UpdatedReplicas != s.Status.Replicas { toUpdate = append(toUpdate, s) } } return toUpdate } // PodNames returns the names of the pods for all StatefulSets in the list. func (l StatefulSetList) PodNames() []string { names := make([]string, 0, len(l)) for _, s := range l { names = append(names, sset.PodNames(s)...) } return names } // ExpectedNodeCount returns the sum of replicas of each StatefulSet in the StatefulSetList. func (l StatefulSetList) ExpectedNodeCount() int32 { count := int32(0) for _, s := range l { count += sset.GetReplicas(s) } return count } // ExpectedMasterNodesCount returns the number of master nodes expected from the StatefulSetList. func (l StatefulSetList) ExpectedMasterNodesCount() int32 { count := int32(0) for _, s := range l { if label.IsMasterNodeSet(s) { count += sset.GetReplicas(s) } } return count } // ExpectedDataNodesCount returns the number of data nodes expected from the StatefulSetList. func (l StatefulSetList) ExpectedDataNodesCount() int32 { count := int32(0) for _, s := range l { if label.IsDataNodeSet(s) { count += sset.GetReplicas(s) } } return count } // ExpectedIngestNodesCount returns the number of ingest nodes expected from the StatefulSetList. func (l StatefulSetList) ExpectedIngestNodesCount() int32 { count := int32(0) for _, s := range l { if label.IsIngestNodeSet(s) { count += sset.GetReplicas(s) } } return count } // PVCNames returns the names of PVCs for all pods of the StatefulSetList. func (l StatefulSetList) PVCNames() []string { var pvcNames []string for _, s := range l { podNames := sset.PodNames(s) for _, claim := range s.Spec.VolumeClaimTemplates { for _, podName := range podNames { pvcNames = append(pvcNames, fmt.Sprintf("%s-%s", claim.Name, podName)) } } } return pvcNames } // GetActualPods returns the list of pods currently existing in the StatefulSetList. func (l StatefulSetList) GetActualPods(c k8s.Client) ([]corev1.Pod, error) { allPods := []corev1.Pod{} for _, statefulSet := range l { sset := statefulSet pods, err := GetActualPodsForStatefulSet(c, k8s.ExtractNamespacedName(&sset)) if err != nil { return nil, err } allPods = append(allPods, pods...) } return allPods, nil } // PodReconciliationDone returns true if actual existing pods match what is specified in the StatefulSetList. // It may return false if there are pods in the process of being: // - created (but not there in our resources cache) // - removed (but still there in our resources cache) // Status of the pods (running, error, etc.) is ignored. func (l StatefulSetList) PodReconciliationDone(ctx context.Context, c k8s.Client) (bool, string, error) { for _, statefulset := range l { ok, reason, err := sset.PodReconciliationDone(ctx, c, statefulset, label.StatefulSetNameLabelName) if !ok { return false, reason, err } } return true, "", nil } // PendingReconciliation returns the list of StatefulSets for which status.observedGeneration does not match the metadata.generation. // The status is automatically updated by the StatefulSet controller: if the observedGeneration does not match // the metadata generation, it means the resource has not been processed by the StatefulSet controller yet. // When that happens, other fields in the StatefulSet status (eg. "updateRevision") may not be up to date. func (l StatefulSetList) PendingReconciliation() StatefulSetList { var statefulSetList StatefulSetList for _, s := range l { if s.Generation != s.Status.ObservedGeneration { s := s statefulSetList = append(statefulSetList, s) } } return statefulSetList } // DeepCopy returns a copy of the StatefulSetList with no reference to the original StatefulSetList. func (l StatefulSetList) DeepCopy() StatefulSetList { result := make(StatefulSetList, 0, len(l)) for _, s := range l { result = append(result, *s.DeepCopy()) } return result } // WithStatefulSet returns the StatefulSetList updated to contain the given StatefulSet. // If one already exists with the same namespace & name, it will be replaced. func (l StatefulSetList) WithStatefulSet(statefulSet appsv1.StatefulSet) StatefulSetList { for i := range l { if l[i].Name == statefulSet.Name && l[i].Namespace == statefulSet.Namespace { // replace the existing StatefulSet in the list l[i] = statefulSet return l } } // add a new StatefulSet to the list return append(l, statefulSet) } // AtLeastOneESVersionMatch returns true if at least one StatefulSet's ES version matches the given condition. func (l StatefulSetList) AtLeastOneESVersionMatch(ctx context.Context, condition func(v version.Version) bool) bool { for _, s := range l { if ESVersionMatch(ctx, s, condition) { return true } } return false } // ESVersionMatch returns true if the ES version for this StatefulSet matches the given condition. func ESVersionMatch(ctx context.Context, statefulSet appsv1.StatefulSet, condition func(v version.Version) bool) bool { v, err := GetESVersion(statefulSet) if err != nil { ulog.FromContext(ctx).Error(err, "cannot parse version from StatefulSet", "namespace", statefulSet.Namespace, "name", statefulSet.Name) return false } return condition(v) }