pkg/controller/elasticsearch/driver/upscale.go (124 lines of code) (raw):

// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License 2.0; // you may not use this file except in compliance with the Elastic License 2.0. package driver import ( "context" "fmt" appsv1 "k8s.io/api/apps/v1" esv1 "github.com/elastic/cloud-on-k8s/v3/pkg/apis/elasticsearch/v1" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/expectations" sset "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/statefulset" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/nodespec" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/reconcile" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/settings" es_sset "github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/sset" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/version/zen1" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/version/zen2" "github.com/elastic/cloud-on-k8s/v3/pkg/utils/k8s" ) type upscaleCtx struct { parentCtx context.Context k8sClient k8s.Client es esv1.Elasticsearch esState ESState expectations *expectations.Expectations validateStorageClass bool upscaleReporter *reconcile.UpscaleReporter } type UpscaleResults struct { ActualStatefulSets es_sset.StatefulSetList Requeue bool } // HandleUpscaleAndSpecChanges reconciles expected NodeSet resources. // It does: // - create any new StatefulSets // - update existing StatefulSets specification, to be used for future pods rotation // - upscale StatefulSet for which we expect more replicas // - limit master node creation to one at a time // - resize (inline) existing PVCs to match new StatefulSet storage reqs and schedule the StatefulSet recreation // It does not: // - perform any StatefulSet downscale (left for downscale phase) // - perform any pod upgrade (left for rolling upgrade phase) func HandleUpscaleAndSpecChanges( ctx upscaleCtx, actualStatefulSets es_sset.StatefulSetList, expectedResources nodespec.ResourcesList, ) (UpscaleResults, error) { results := UpscaleResults{} // Set the list of expected new nodes in the status early. This is to ensure that the list of expected nodes to be // created is surfaced in the status even if an error occurs later in the upscale process. ctx.upscaleReporter.RecordNewNodes(podsToCreate(actualStatefulSets, expectedResources.StatefulSets())) // adjust expected replicas to control nodes creation and deletion adjusted, err := adjustResources(ctx, actualStatefulSets, expectedResources) if err != nil { return results, fmt.Errorf("adjust resources: %w", err) } // reconcile all resources for _, res := range adjusted { res := res if err := settings.ReconcileConfig(ctx.parentCtx, ctx.k8sClient, ctx.es, res.StatefulSet.Name, res.Config); err != nil { return results, fmt.Errorf("reconcile config: %w", err) } if _, err := common.ReconcileService(ctx.parentCtx, ctx.k8sClient, &res.HeadlessService, &ctx.es); err != nil { return results, fmt.Errorf("reconcile service: %w", err) } if actualSset, exists := actualStatefulSets.GetByName(res.StatefulSet.Name); exists { recreateSset, err := handleVolumeExpansion(ctx.parentCtx, ctx.k8sClient, ctx.es, res.StatefulSet, actualSset, ctx.validateStorageClass) if err != nil { return results, fmt.Errorf("handle volume expansion: %w", err) } if recreateSset { // The StatefulSet is scheduled for recreation: let's requeue before attempting any further spec change. results.Requeue = true continue } } reconciled, err := es_sset.ReconcileStatefulSet(ctx.parentCtx, ctx.k8sClient, ctx.es, res.StatefulSet, ctx.expectations) if err != nil { return results, fmt.Errorf("reconcile StatefulSet: %w", err) } // update actual with the reconciled ones for next steps to work with up-to-date information actualStatefulSets = actualStatefulSets.WithStatefulSet(reconciled) } results.ActualStatefulSets = actualStatefulSets return results, nil } func podsToCreate( actualStatefulSets, expectedStatefulSets es_sset.StatefulSetList, ) []string { var pods []string for _, expectedStatefulSet := range expectedStatefulSets { actualSset, _ := actualStatefulSets.GetByName(expectedStatefulSet.Name) expectedReplicas := sset.GetReplicas(expectedStatefulSet) for expectedReplicas > sset.GetReplicas(actualSset) { pods = append(pods, sset.PodName(expectedStatefulSet.Name, expectedReplicas-1)) expectedReplicas-- } } return pods } func adjustResources( ctx upscaleCtx, actualStatefulSets es_sset.StatefulSetList, expectedResources nodespec.ResourcesList, ) (nodespec.ResourcesList, error) { upscaleState := newUpscaleState(ctx, actualStatefulSets, expectedResources) adjustedResources := make(nodespec.ResourcesList, 0, len(expectedResources)) for _, nodeSpecRes := range expectedResources { adjusted, err := adjustStatefulSetReplicas(upscaleState, actualStatefulSets, *nodeSpecRes.StatefulSet.DeepCopy()) if err != nil { return nil, err } nodeSpecRes.StatefulSet = adjusted adjustedResources = append(adjustedResources, nodeSpecRes) } // adapt resources configuration to match adjusted replicas if err := adjustZenConfig(ctx.parentCtx, ctx.k8sClient, ctx.es, adjustedResources); err != nil { return nil, fmt.Errorf("adjust discovery config: %w", err) } return adjustedResources, nil } func adjustZenConfig(ctx context.Context, k8sClient k8s.Client, es esv1.Elasticsearch, resources nodespec.ResourcesList) error { // patch configs to consider zen1 minimum master nodes if err := zen1.SetupMinimumMasterNodesConfig(ctx, k8sClient, es, resources); err != nil { return err } // patch configs to consider zen2 initial master nodes return zen2.SetupInitialMasterNodes(ctx, es, k8sClient, resources) } // adjustStatefulSetReplicas updates the replicas count in expected according to // what is allowed by the upscaleState, that may be mutated as a result. func adjustStatefulSetReplicas( upscaleState *upscaleState, actualStatefulSets es_sset.StatefulSetList, expected appsv1.StatefulSet, ) (appsv1.StatefulSet, error) { actual, alreadyExists := actualStatefulSets.GetByName(expected.Name) expectedReplicas := sset.GetReplicas(expected) actualReplicas := sset.GetReplicas(actual) if actualReplicas < expectedReplicas { return upscaleState.limitNodesCreation(actual, expected) } if alreadyExists && expectedReplicas < actualReplicas { // this is a downscale. // We still want to update the sset spec to the newest one, but leave scaling down as it's done later. nodespec.UpdateReplicas(&expected, actual.Spec.Replicas) } return expected, nil }