pkg/controller/elasticsearch/driver/fixtures.go (326 lines of code) (raw):

// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License 2.0; // you may not use this file except in compliance with the Elastic License 2.0. package driver import ( "testing" "github.com/stretchr/testify/require" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/utils/ptr" crclient "sigs.k8s.io/controller-runtime/pkg/client" esv1 "github.com/elastic/cloud-on-k8s/v3/pkg/apis/elasticsearch/v1" common "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/settings" sset "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/statefulset" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/version" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/client" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/label" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/nodespec" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/settings" es_sset "github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/sset" "github.com/elastic/cloud-on-k8s/v3/pkg/utils/k8s" ) const ( TestEsName = "TestES" TestEsNamespace = "TestNS" ) type testPod struct { name string version string ssetName string roles []string healthy, toUpgrade, inCluster, terminating bool uid types.UID resourceVersion string finalizers []string } func newTestPod(name string) testPod { return testPod{ name: name, uid: uuid.NewUUID(), resourceVersion: "123", } } func (t testPod) isInCluster(v bool) testPod { t.inCluster = v; return t } func (t testPod) isHealthy(v bool) testPod { t.healthy = v; return t } func (t testPod) needsUpgrade(v bool) testPod { t.toUpgrade = v; return t } func (t testPod) isTerminating(v bool) testPod { t.terminating = v; return t } func (t testPod) withVersion(v string) testPod { t.version = v; return t } func (t testPod) inStatefulset(ssetName string) testPod { t.ssetName = ssetName; return t } func (t testPod) withResourceVersion(rv string) testPod { t.resourceVersion = rv; return t } //nolint:unparam func (t testPod) withFinalizers(f []string) testPod { t.finalizers = f; return t } func (t testPod) withRoles(roles ...esv1.NodeRole) testPod { t.roles = make([]string, len(roles)) for i := range roles { t.roles[i] = string(roles[i]) } return t } // filter to simulate a Pod that has been removed while upgrading // unfortunately fake client does not support predicate type filter func(pod corev1.Pod) bool // -- Filters var nothing = func(pod corev1.Pod) bool { return false } func byName(name string) filter { return func(pod corev1.Pod) bool { return pod.Name == name } } // - Mutations are used to simulate a type change on a set of Pods, e.g. MD -> D or D -> MD type mutation func(pod corev1.Pod) corev1.Pod var noMutation = func(pod corev1.Pod) corev1.Pod { return pod } func removeMasterType(ssetName string) mutation { return func(pod corev1.Pod) corev1.Pod { podSsetname, _, _ := es_sset.StatefulSetName(pod.Name) if podSsetname == ssetName { pod := pod.DeepCopy() label.NodeTypesMasterLabelName.Set(false, pod.Labels) return *pod } return pod } } func addMasterType(ssetName string) mutation { return func(pod corev1.Pod) corev1.Pod { podSsetname, _, _ := es_sset.StatefulSetName(pod.Name) if podSsetname == ssetName { pod := pod.DeepCopy() label.NodeTypesMasterLabelName.Set(true, pod.Labels) return *pod } return pod } } type upgradeTestPods []testPod func newUpgradeTestPods(pods ...testPod) upgradeTestPods { result := make(upgradeTestPods, len(pods)) copy(result, pods) return result } func (u upgradeTestPods) toES(version string, maxUnavailable int, annotations map[string]string) esv1.Elasticsearch { return esv1.Elasticsearch{ ObjectMeta: metav1.ObjectMeta{ Name: TestEsName, Namespace: TestEsNamespace, Annotations: annotations, }, Spec: esv1.ElasticsearchSpec{ Version: version, UpdateStrategy: esv1.UpdateStrategy{ ChangeBudget: esv1.ChangeBudget{ MaxUnavailable: ptr.To[int32](int32(maxUnavailable)), }, }, }, } } // Infer the list of statefulsets from the Pods used in the test func (u upgradeTestPods) toStatefulSetList() es_sset.StatefulSetList { // Get all the statefulsets statefulSets := make(map[string]int32) for _, testPod := range u { name, ordinal, err := es_sset.StatefulSetName(testPod.name) if err != nil { panic(err) } if replicas, found := statefulSets[name]; found { if ordinal > replicas { statefulSets[name] = ordinal } } else { statefulSets[name] = ordinal } } statefulSetList := make(es_sset.StatefulSetList, len(statefulSets)) i := 0 for statefulSet, replica := range statefulSets { statefulSetList[i] = sset.TestSset{Name: statefulSet, ClusterName: TestEsName, Namespace: TestEsNamespace, Replicas: replica + 1}.Build() i++ } return statefulSetList } func (u upgradeTestPods) toClientObjects(version string, maxUnavailable int, f filter, annotations map[string]string) []crclient.Object { var result []crclient.Object for _, testPod := range u { pod := testPod.toPod() if !f(pod) { result = append(result, &pod) } } es := u.toES(version, maxUnavailable, annotations) result = append(result, &es) return result } func (u upgradeTestPods) toCurrentPods() []corev1.Pod { result := make([]corev1.Pod, 0, len(u)) for _, testPod := range u { result = append(result, testPod.toPod()) } return result } func (u upgradeTestPods) toHealthyPods() map[string]corev1.Pod { result := make(map[string]corev1.Pod) for _, testPod := range u { pod := testPod.toPod() if pod.DeletionTimestamp.IsZero() && k8s.IsPodReady(pod) && testPod.inCluster { result[pod.Name] = pod } } return result } // toResourcesList infers the resources from the test Pod list. func (u upgradeTestPods) toResourcesList(t *testing.T) nodespec.ResourcesList { t.Helper() resourcesByStatefulSet := make(map[string]nodespec.Resources) for _, p := range u { statefulSetName, _, err := es_sset.StatefulSetName(p.name) require.NoError(t, err) if _, exists := resourcesByStatefulSet[statefulSetName]; exists { continue } resources := nodespec.Resources{ StatefulSet: appsv1.StatefulSet{ ObjectMeta: metav1.ObjectMeta{ Name: statefulSetName, }, }, HeadlessService: corev1.Service{}, Config: settings.CanonicalConfig{CanonicalConfig: common.MustCanonicalConfig(map[string]interface{}{})}, } if p.roles != nil { resources.Config = settings.CanonicalConfig{CanonicalConfig: common.MustCanonicalConfig(map[string]interface{}{"node.roles": p.roles})} } resourcesByStatefulSet[statefulSetName] = resources } resources := make(nodespec.ResourcesList, 0, len(resourcesByStatefulSet)) for _, r := range resourcesByStatefulSet { resources = append(resources, r) } return resources } func (u upgradeTestPods) toUpgrade() []corev1.Pod { var result []corev1.Pod for _, testPod := range u { pod := testPod.toPod() if testPod.toUpgrade { result = append(result, pod) } } return result } func (u upgradeTestPods) podsInCluster() []string { var result []string for _, testPod := range u { pod := testPod.toPod() if testPod.inCluster { result = append(result, pod.Name) } } return result } func (u upgradeTestPods) podNamesToESNodeID() map[string]string { result := make(map[string]string) // to minimize the cognitive overhead during unit testing we are using // pod name as Elasticsearch node ID here. for _, p := range u.podsInCluster() { result[p] = p } return result } func (u upgradeTestPods) toMasters(mutation mutation) []string { var result []string for _, testPod := range u { pod := mutation(testPod.toPod()) if label.IsMasterNode(pod) { result = append(result, pod.Name) } } return result } func names(pods []corev1.Pod) []string { result := make([]string, len(pods)) for i, pod := range pods { result[i] = pod.Name } return result } func (t testPod) toPod() corev1.Pod { var deletionTimestamp *metav1.Time if t.terminating { now := metav1.Now() deletionTimestamp = &now } pod := corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: t.name, Namespace: TestEsNamespace, UID: t.uid, DeletionTimestamp: deletionTimestamp, ResourceVersion: t.resourceVersion, Finalizers: t.finalizers, }, } if t.version == "" { t.version = "7.4.0" } pod.Labels = label.NewPodLabels( types.NamespacedName{ Namespace: TestEsNamespace, Name: TestEsName, }, t.ssetName, version.MustParse(t.version), &esv1.Node{ Roles: t.roles, }, "https", ) if t.healthy { pod.Status = corev1.PodStatus{ Conditions: []corev1.PodCondition{ { Type: corev1.PodReady, Status: corev1.ConditionTrue, }, { Type: corev1.ContainersReady, Status: corev1.ConditionTrue, }, }, } } return pod } func (t testPod) toPodPtr() *corev1.Pod { pod := t.toPod() return &pod } type testESState struct { inCluster []string health client.Health ESState } func (t *testESState) ShardAllocationsEnabled() (bool, error) { return true, nil } func (t *testESState) Health() (client.Health, error) { return t.health, nil } func (t *testESState) NodesInCluster(nodeNames []string) (bool, error) { for _, nodeName := range nodeNames { for _, inClusterPods := range t.inCluster { if nodeName == inClusterPods { return true, nil } } } return false, nil } func newSettings(nodeRoles ...esv1.NodeRole) esv1.ElasticsearchSettings { roles := make([]string, len(nodeRoles)) for i := range nodeRoles { roles[i] = string(nodeRoles[i]) } return esv1.ElasticsearchSettings{ Node: &esv1.Node{ Roles: roles, }, Cluster: esv1.ClusterSettings{}, } } // emptySettingsNode can be used in tests as a node with only the default settings. var emptySettingsNode = esv1.ElasticsearchSettings{}