pkg/api/platformapi/allocatorapi/vacate.go (573 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package allocatorapi
import (
"context"
"fmt"
"strings"
hashimultierror "github.com/hashicorp/go-multierror"
"github.com/pkg/errors"
"github.com/elastic/cloud-sdk-go/pkg/api"
"github.com/elastic/cloud-sdk-go/pkg/api/apierror"
"github.com/elastic/cloud-sdk-go/pkg/client/platform_infrastructure"
"github.com/elastic/cloud-sdk-go/pkg/models"
"github.com/elastic/cloud-sdk-go/pkg/multierror"
"github.com/elastic/cloud-sdk-go/pkg/plan"
"github.com/elastic/cloud-sdk-go/pkg/plan/planutil"
"github.com/elastic/cloud-sdk-go/pkg/sync/pool"
"github.com/elastic/cloud-sdk-go/pkg/util"
"github.com/elastic/cloud-sdk-go/pkg/util/ec"
"github.com/elastic/cloud-sdk-go/pkg/util/slice"
)
const (
// PlanPendingMessage is used to discard
PlanPendingMessage = "There is a plan still pending, cancel that or wait for it to complete before restarting"
)
// Vacate drains allocated resource instances away from the allocator list either to
// a specific allocator list or we let the constructor decide if that is empty.
// If resources is set, it will only move the instances that are part of those IDs.
// If kind is specified, it will only move the resources that match that kind.
// If none is specified it will add all of the resources in the allocator.
// The maximum concurrent moves is controlled by the Concurrency parameter.
func Vacate(params *VacateParams) error {
if err := params.Validate(); err != nil {
return err
}
var emptyTimeout pool.Timeout
if params.PoolTimeout == emptyTimeout {
params.PoolTimeout = pool.DefaultTimeout
}
p, err := pool.NewPool(pool.Params{
Size: params.Concurrency,
Run: VacateClusterInPool,
Timeout: params.PoolTimeout,
Writer: params.Output,
})
if err != nil {
return err
}
// Errors reported here are returned by a dry-run execution of the move api (validation-only flag is used)
// and we don't want to stop the real vacate.
// Instead we are returning the validateOnlyErr with the actual vacate validateOnlyErr at the end of the function
leftovers, hasWork, validateOnlyErr := moveAllocators(params, p)
if err := p.Start(); err != nil {
return err
}
// If the queue was full prior to starting it, there might be some
// leftovers, ranging until the leftovers are inexistent as the pool
// clears out the work items.
for len(leftovers) > 0 {
leftovers, _ = p.Add(leftovers...)
}
var merr = multierror.NewPrefixed("vacate error")
unpackMultierror(merr, validateOnlyErr)
// Wait until all of the items have been processed and unpack the errors
// from the pooled vacate calls tos the multierror.
if err := waitVacateCompletion(p, hasWork); err != nil {
unpackMultierror(merr, err)
}
return multierror.WithFormat(merr.ErrorOrNil(), params.OutputFormat)
}
// moveAllocators ranges over the list of provided allocators and moves the
// nodes off each allocator, finally, returns any leftovers from full pool
// queues, whether or not any work was added to the pool, and potential errors
// returned from API calls.
func moveAllocators(params *VacateParams, p *pool.Pool) ([]pool.Validator, bool, error) {
var leftovers []pool.Validator
var merr = multierror.NewPrefixed("vacate error")
var hasWork bool
for _, id := range params.Allocators {
left, moved, err := moveNodes(id, params, p)
merr = merr.Append(err)
if len(left) > 0 {
leftovers = append(leftovers, left...)
}
if moved {
hasWork = true
}
}
return leftovers, hasWork, merr.ErrorOrNil()
}
// moveNodes moves all of the nodes off the specified allocator
func moveNodes(id string, params *VacateParams, p *pool.Pool) ([]pool.Validator, bool, error) {
var merr = multierror.NewPrefixed("vacate error")
res, err := params.API.V1API.PlatformInfrastructure.MoveClusters(
platform_infrastructure.NewMoveClustersParams().
WithAllocatorID(id).
WithMoveOnly(params.MoveOnly).
WithContext(api.WithRegion(context.Background(), params.Region)).
WithValidateOnly(ec.Bool(true)),
params.AuthWriter,
)
if err != nil {
return nil, false, merr.Append(VacateError{
AllocatorID: id,
Err: apierror.Wrap(err),
})
}
if err := CheckVacateFailures(res.Payload.Failures, params.ClusterFilter, id); err != nil {
// Errors already wrapped in VacateError
merr = merr.Append(err)
}
work, hasWork := addAllocatorMovesToPool(addAllocatorMovesToPoolParams{
ID: id,
Pool: p,
Moves: res.Payload.Moves,
VacateParams: params,
})
return work, hasWork, merr.ErrorOrNil()
}
// waitVacateCompletion waits for the pool to be finished if there's work
// items that were added and when all of the items have been processed
// stops the pool, and returns a multierror with any leftovers from the
// stopped pool.
func waitVacateCompletion(p *pool.Pool, hasWork bool) error {
var merr = multierror.NewPrefixed("vacate error")
if hasWork {
if err := p.Wait(); err != nil {
unpackMultierror(merr, err)
}
}
if p.Status() < pool.StoppingStatus {
// Stop the pool once we've finished all the work
if err := p.Stop(); err != nil && err != pool.ErrStopOperationTimedOut {
unpackMultierror(merr, err)
}
}
leftovers, _ := p.Leftovers()
for _, lover := range leftovers {
if params, ok := lover.(*VacateClusterParams); ok {
merr = merr.Append(VacateError{
AllocatorID: params.ID,
ResourceID: params.ClusterID,
Kind: params.Kind,
Err: apierror.JSONError{
Message: "was either cancelled or not processed, follow up accordingly",
},
})
}
}
return merr.ErrorOrNil()
}
// nolint since the linter says it's too complex, it's because of the for loop
// combination with ifs. It's better to have this grouped rather than scattered
// around.
func addAllocatorMovesToPool(params addAllocatorMovesToPoolParams) ([]pool.Validator, bool) {
var leftovers []pool.Validator
var vacates = make([]pool.Validator, 0)
if params.Moves == nil {
return leftovers, len(vacates) > 0
}
var filter = params.VacateParams.ClusterFilter
var kindFilter = params.VacateParams.KindFilter
for _, move := range params.Moves.ElasticsearchClusters {
if len(filter) > 0 && !slice.HasString(filter, *move.ClusterID) {
continue
}
var kind = util.Elasticsearch
if kindFilter != "" && kind != kindFilter {
break
}
vacates = append(vacates, newVacateClusterParams(params, *move.ClusterID, kind))
}
for _, move := range params.Moves.KibanaClusters {
if len(filter) > 0 && !slice.HasString(filter, *move.ClusterID) {
continue
}
var kind = util.Kibana
if kindFilter != "" && kind != kindFilter {
break
}
vacates = append(vacates, newVacateClusterParams(params, *move.ClusterID, kind))
}
for _, move := range params.Moves.ApmClusters {
if len(filter) > 0 && !slice.HasString(filter, *move.ClusterID) {
continue
}
var kind = util.Apm
if kindFilter != "" && kind != kindFilter {
break
}
vacates = append(vacates, newVacateClusterParams(params, *move.ClusterID, kind))
}
for _, move := range params.Moves.AppsearchClusters {
if len(filter) > 0 && !slice.HasString(filter, *move.ClusterID) {
continue
}
var kind = util.Appsearch
if kindFilter != "" && kind != kindFilter {
break
}
vacates = append(vacates, newVacateClusterParams(params, *move.ClusterID, kind))
}
for _, move := range params.Moves.EnterpriseSearchClusters {
if len(filter) > 0 && !slice.HasString(filter, *move.ClusterID) {
continue
}
var kind = util.EnterpriseSearch
if kindFilter != "" && kind != kindFilter {
break
}
vacates = append(vacates, newVacateClusterParams(params, *move.ClusterID, kind))
}
if leftover, _ := params.Pool.Add(vacates...); len(leftover) > 0 {
leftovers = append(leftovers, leftover...)
}
return leftovers, len(vacates) > 0
}
func newVacateClusterParams(params addAllocatorMovesToPoolParams, id, kind string) *VacateClusterParams {
clusterParams := VacateClusterParams{
API: params.VacateParams.API,
ID: params.ID,
Kind: kind,
ClusterID: id,
Region: params.VacateParams.Region,
SkipTracking: params.VacateParams.SkipTracking,
ClusterFilter: params.VacateParams.ClusterFilter,
PreferredAllocators: params.VacateParams.PreferredAllocators,
MaxPollRetries: params.VacateParams.MaxPollRetries,
TrackFrequency: params.VacateParams.TrackFrequency,
Output: params.VacateParams.Output,
OutputFormat: params.VacateParams.OutputFormat,
MoveOnly: params.VacateParams.MoveOnly,
PlanOverrides: params.VacateParams.PlanOverrides,
}
if params.VacateParams.AllocatorDown != nil {
clusterParams.AllocatorDown = params.VacateParams.AllocatorDown
}
return &clusterParams
}
// VacateClusterInPool vacates a resource from an allocator, complying
// with the pool.RunFunc signature.
func VacateClusterInPool(p pool.Validator) error {
if p == nil {
return errors.New("allocator vacate: params cannot be nil")
}
if params, ok := p.(*VacateClusterParams); ok {
return VacateCluster(params)
}
return errors.New("allocator vacate: failed casting parameters to *VacateClusterParams")
}
// VacateCluster moves a resource node off an allocator.
func VacateCluster(params *VacateClusterParams) error {
params, err := fillVacateClusterParams(params)
if err != nil {
return err
}
if err := moveClusterByType(params); err != nil {
return multierror.WithFormat(err, params.OutputFormat)
}
if params.SkipTracking {
return nil
}
return planutil.TrackChange(planutil.TrackChangeParams{
TrackChangeParams: plan.TrackChangeParams{
API: params.API,
ResourceID: params.ClusterID,
Kind: params.Kind,
IgnoreDownstream: true,
Config: plan.TrackFrequencyConfig{
PollFrequency: params.TrackFrequency,
MaxRetries: int(params.MaxPollRetries),
},
},
Writer: params.Output,
Format: params.OutputFormat,
})
}
// fillVacateClusterParams validates the parameters and fills any missing
// properties that are set to a default if empty. Performs a Get on the
// allocator to discover the allocator health if AllocatorDown is nil.
func fillVacateClusterParams(params *VacateClusterParams) (*VacateClusterParams, error) {
if params == nil {
return nil, errors.New("allocator vacate: params cannot be nil")
}
if err := params.Validate(); err != nil {
return nil, multierror.NewPrefixed(fmt.Sprintf(
"allocator %s: resource id [%s][%s]",
params.ID, params.ClusterID, params.Kind), err,
)
}
if params.AllocatorDown == nil {
alloc, err := Get(
GetParams{API: params.API, ID: params.ID, Region: params.Region},
)
if err != nil {
return nil, VacateError{
AllocatorID: params.ID,
ResourceID: params.ClusterID,
Kind: params.Kind,
Ctx: "allocator health autodiscovery",
Err: err,
}
}
if alloc.Status != nil {
params.AllocatorDown = ec.Bool(!*alloc.Status.Connected || !*alloc.Status.Healthy)
}
}
if params.MaxPollRetries == 0 {
params.MaxPollRetries = util.DefaultRetries
}
if params.TrackFrequency.Nanoseconds() == 0 {
params.TrackFrequency = util.DefaultPollFrequency
}
return params, nil
}
// newMoveClusterParams
func newMoveClusterParams(params *VacateClusterParams) (*platform_infrastructure.MoveClustersByTypeParams, error) {
// By setting the ClusterID in the request body, the API will only return the matched cluster's plan information.
// This greatly reduces the amount of work that the API has to perform to return the calculated plan.
req := getVacateRequestByClusterID(params.ClusterID, params.Kind)
res, err := params.API.V1API.PlatformInfrastructure.MoveClusters(
platform_infrastructure.NewMoveClustersParams().
WithAllocatorDown(params.AllocatorDown).
WithMoveOnly(params.MoveOnly).
WithAllocatorID(params.ID).
WithContext(api.WithRegion(context.Background(), params.Region)).
WithValidateOnly(ec.Bool(true)).
WithBody(req),
params.AuthWriter,
)
if err != nil {
return nil, VacateError{
AllocatorID: params.ID,
ResourceID: params.ClusterID,
Kind: params.Kind,
Ctx: "failed obtaining default vacate parameters",
Err: apierror.Wrap(err),
}
}
req = ComputeVacateRequest(res.Payload.Moves,
[]string{params.ClusterID},
params.PreferredAllocators,
params.PlanOverrides,
)
var moveParams = platform_infrastructure.NewMoveClustersByTypeParams().
WithAllocatorID(params.ID).
WithAllocatorDown(params.AllocatorDown).
WithContext(api.WithRegion(context.Background(), params.Region)).
WithBody(req)
if len(req.ElasticsearchClusters) > 0 {
moveParams.SetClusterType(util.Elasticsearch)
}
if len(req.KibanaClusters) > 0 {
moveParams.SetClusterType(util.Kibana)
}
if len(req.ApmClusters) > 0 {
moveParams.SetClusterType(util.Apm)
}
if len(req.AppsearchClusters) > 0 {
moveParams.SetClusterType(util.Appsearch)
}
if len(req.EnterpriseSearchClusters) > 0 {
moveParams.SetClusterType(util.EnterpriseSearch)
}
return moveParams, nil
}
// moveClusterByType moves a resource's node from its allocator
func moveClusterByType(params *VacateClusterParams) error {
moveParams, err := newMoveClusterParams(params)
if err != nil {
return err
}
res, err := params.API.V1API.PlatformInfrastructure.MoveClustersByType(
moveParams, params.AuthWriter,
)
if err != nil {
return VacateError{
AllocatorID: params.ID,
ResourceID: params.ClusterID,
Kind: params.Kind,
Ctx: "resource move API call error",
Err: apierror.Wrap(err),
}
}
return CheckVacateFailures(res.Payload.Failures, params.ClusterFilter, params.ID)
}
// CheckVacateFailures iterates over the list of failures returning a multierror
// of VacateError if any of failures are found.
//
// nolint because of the complexity score here
func CheckVacateFailures(failures *models.MoveClustersDetails, filter []string, allocatorID string) error {
if failures == nil {
return nil
}
var merr = multierror.NewPrefixed("vacate error")
const errMsgFmt = "%s (%s)"
const errCtx = "failed vacating"
for _, failure := range failures.ElasticsearchClusters {
if len(filter) > 0 && !slice.HasString(filter, *failure.ClusterID) {
continue
}
var ferr error
if len(failure.Errors) > 0 {
var err = failure.Errors[0]
ferr = fmt.Errorf("%s (%s)", *err.Message, *err.Code)
}
if !strings.Contains(ferr.Error(), PlanPendingMessage) {
merr = merr.Append(VacateError{
AllocatorID: allocatorID,
ResourceID: *failure.ClusterID,
Kind: util.Elasticsearch,
Ctx: errCtx,
Err: ferr,
})
}
}
for _, failure := range failures.KibanaClusters {
if len(filter) > 0 && !slice.HasString(filter, *failure.ClusterID) {
continue
}
var ferr error
if len(failure.Errors) > 0 {
var err = failure.Errors[0]
ferr = fmt.Errorf("%s (%s)", *err.Message, *err.Code)
}
if !strings.Contains(ferr.Error(), PlanPendingMessage) {
merr = merr.Append(VacateError{
AllocatorID: allocatorID,
ResourceID: *failure.ClusterID,
Kind: util.Kibana,
Ctx: errCtx,
Err: ferr,
})
}
}
for _, failure := range failures.ApmClusters {
if len(filter) > 0 && !slice.HasString(filter, *failure.ClusterID) {
continue
}
var ferr error
if len(failure.Errors) > 0 {
var err = failure.Errors[0]
ferr = fmt.Errorf("%s (%s)", *err.Message, *err.Code)
}
if !strings.Contains(ferr.Error(), PlanPendingMessage) {
merr = merr.Append(VacateError{
AllocatorID: allocatorID,
ResourceID: *failure.ClusterID,
Kind: util.Apm,
Ctx: errCtx,
Err: ferr,
})
}
}
for _, failure := range failures.AppsearchClusters {
if len(filter) > 0 && !slice.HasString(filter, *failure.ClusterID) {
continue
}
var ferr error
if len(failure.Errors) > 0 {
var err = failure.Errors[0]
ferr = fmt.Errorf("%s (%s)", *err.Message, *err.Code)
}
if !strings.Contains(ferr.Error(), PlanPendingMessage) {
merr = merr.Append(VacateError{
AllocatorID: allocatorID,
ResourceID: *failure.ClusterID,
Kind: util.Appsearch,
Ctx: errCtx,
Err: ferr,
})
}
}
for _, failure := range failures.EnterpriseSearchClusters {
if len(filter) > 0 && !slice.HasString(filter, *failure.ClusterID) {
continue
}
var ferr error
if len(failure.Errors) > 0 {
var err = failure.Errors[0]
ferr = fmt.Errorf("%s (%s)", *err.Message, *err.Code)
}
if !strings.Contains(ferr.Error(), PlanPendingMessage) {
merr = merr.Append(VacateError{
AllocatorID: allocatorID,
ResourceID: *failure.ClusterID,
Kind: util.EnterpriseSearch,
Ctx: errCtx,
Err: ferr,
})
}
}
return merr.ErrorOrNil()
}
// getVacateRequestByClusterID makes models.MoveClusterRequest object which contains a cluster ID
// and the object will be set to body of an API call which will retrieve calculated plan data to
// be used to move a node.
func getVacateRequestByClusterID(clusterID, clusterType string) *models.MoveClustersRequest {
var req models.MoveClustersRequest
switch clusterType {
case util.Elasticsearch:
req.ElasticsearchClusters = append(req.ElasticsearchClusters,
&models.MoveElasticsearchClusterConfiguration{
ClusterIds: []string{clusterID},
},
)
case util.Kibana:
req.KibanaClusters = append(req.KibanaClusters,
&models.MoveKibanaClusterConfiguration{
ClusterIds: []string{clusterID},
},
)
case util.Apm:
req.ApmClusters = append(req.ApmClusters,
&models.MoveApmClusterConfiguration{
ClusterIds: []string{clusterID},
},
)
case util.Appsearch:
req.AppsearchClusters = append(req.AppsearchClusters,
&models.MoveAppSearchConfiguration{
ClusterIds: []string{clusterID},
},
)
case util.EnterpriseSearch:
req.EnterpriseSearchClusters = append(req.EnterpriseSearchClusters,
&models.MoveEnterpriseSearchConfiguration{
ClusterIds: []string{clusterID},
},
)
}
return &req
}
// ComputeVacateRequest filters the tentative resources that would be moved and
// filters those by ID if it's specified, also setting any preferred allocators
// if that is sent. Any resource plan overrides will be set in this function.
// nolint due to complexity
func ComputeVacateRequest(pr *models.MoveClustersDetails, resources, to []string, overrides PlanOverrides) *models.MoveClustersRequest {
var req models.MoveClustersRequest
for _, c := range pr.ElasticsearchClusters {
if len(resources) > 0 && !slice.HasString(resources, *c.ClusterID) {
continue
}
if overrides.SkipSnapshot != nil {
c.CalculatedPlan.PlanConfiguration.SkipSnapshot = overrides.SkipSnapshot
}
if overrides.SkipDataMigration != nil {
c.CalculatedPlan.PlanConfiguration.SkipDataMigration = overrides.SkipDataMigration
}
if overrides.OverrideFailsafe != nil {
c.CalculatedPlan.PlanConfiguration.OverrideFailsafe = overrides.OverrideFailsafe
}
c.CalculatedPlan.PlanConfiguration.PreferredAllocators = to
req.ElasticsearchClusters = append(req.ElasticsearchClusters,
&models.MoveElasticsearchClusterConfiguration{
ClusterIds: []string{*c.ClusterID},
PlanOverride: c.CalculatedPlan,
},
)
}
for _, c := range pr.KibanaClusters {
if len(resources) > 0 && !slice.HasString(resources, *c.ClusterID) {
continue
}
c.CalculatedPlan.PlanConfiguration.PreferredAllocators = to
req.KibanaClusters = append(req.KibanaClusters,
&models.MoveKibanaClusterConfiguration{
ClusterIds: []string{*c.ClusterID},
PlanOverride: c.CalculatedPlan,
},
)
}
for _, c := range pr.ApmClusters {
if len(resources) > 0 && !slice.HasString(resources, *c.ClusterID) {
continue
}
c.CalculatedPlan.PlanConfiguration.PreferredAllocators = to
req.ApmClusters = append(req.ApmClusters,
&models.MoveApmClusterConfiguration{
ClusterIds: []string{*c.ClusterID},
PlanOverride: c.CalculatedPlan,
},
)
}
for _, c := range pr.AppsearchClusters {
if len(resources) > 0 && !slice.HasString(resources, *c.ClusterID) {
continue
}
c.CalculatedPlan.PlanConfiguration.PreferredAllocators = to
req.AppsearchClusters = append(req.AppsearchClusters,
&models.MoveAppSearchConfiguration{
ClusterIds: []string{*c.ClusterID},
PlanOverride: c.CalculatedPlan,
},
)
}
for _, c := range pr.EnterpriseSearchClusters {
if len(resources) > 0 && !slice.HasString(resources, *c.ClusterID) {
continue
}
c.CalculatedPlan.PlanConfiguration.PreferredAllocators = to
req.EnterpriseSearchClusters = append(req.EnterpriseSearchClusters,
&models.MoveEnterpriseSearchConfiguration{
ClusterIds: []string{*c.ClusterID},
PlanOverride: c.CalculatedPlan,
},
)
}
return &req
}
// unpackMultierror transforms a appends the individual errors to a multierror.Prefixed.
func unpackMultierror(merr *multierror.Prefixed, err error) {
var hashimerr *hashimultierror.Error
if errors.As(err, &hashimerr) {
for _, v := range hashimerr.Errors {
_ = merr.Append(v)
}
return
}
var prefixed *multierror.Prefixed
if errors.As(err, &prefixed) {
merr.Errors = append(merr.Errors, prefixed.Errors...)
return
}
_ = merr.Append(err)
}