internal/pkg/testing/esutil/ilm.go (167 lines of code) (raw):

// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. package esutil import ( "context" "encoding/json" "io" "net/http" "strconv" "strings" "github.com/rs/zerolog" "github.com/elastic/go-elasticsearch/v8" ) // Can be cleaner but it's temporary bootstrap until it's moved to the elasticseach system index plugin const ( defaultILMPolicy = ` { "policy":{ "phases":{ } } }` ilmRolloverSize = 300 ilmRolloverAge = 30 ilmDeleteAge = 90 ilmPolicySuffix = "ilm-policy" ) func EnsureILMPolicy(ctx context.Context, cli *elasticsearch.Client, name string) error { policy := GetILMPolicyName(name) lg := zerolog.Ctx(ctx).With().Str("policy", policy).Logger() res, err := cli.ILM.GetLifecycle( cli.ILM.GetLifecycle.WithPolicy(policy), cli.ILM.GetLifecycle.WithContext(ctx), ) // Check if general error, network failure, timeout, etc. if err != nil { lg.Info().Err(err).Msgf("Failed to fetch ILM policy") return err } defer res.Body.Close() // Only create ILM lifecycle if we got a definite 404 from the server if res.StatusCode == http.StatusNotFound { // Got 404. Could be from elastic, could be from the cloud if deployment is not found. // Parse response to figure out the details from JSON body errRes, err := parseResponseError(res, zerolog.Ctx(ctx)) if err != nil { lg.Warn().Err(err).Msgf("Failed to parse ILM policy not found response.") return err } // Got 404 from Elasticsearch if errRes.Status == http.StatusNotFound { lg.Info().Msgf("ILM policy is not found. Create a new one.") err = createILMPolicy(ctx, cli, name) if err != nil { return err } return nil } // Return elasticsearch error details return &ClientError{ StatusCode: errRes.Status, Type: errRes.Error.Type, Reason: errRes.Error.Reason, } } // Check for other possible error responses err = checkResponseError(res, zerolog.Ctx(ctx)) if err != nil { lg.Info().Err(err).Msgf("Error response on fetching ILM Policy") return err } // No error found, ILM policy already exists lg.Info().Msg("Found ILM policy") // Fetched the ILM policy successfully. Check the settings and if they need to be updated. if res.StatusCode == http.StatusOK { err = checkUpdateILMPolicy(ctx, cli, lg, policy, res.Body) if err != nil { lg.Warn().Err(err).Msg("Failed to update ILM policy settings") return err } } return nil } func checkUpdateILMPolicy(ctx context.Context, cli *elasticsearch.Client, lg zerolog.Logger, name string, r io.Reader) error { policy := GetILMPolicyName(name) lg.Info().Msg("Check ILM policy settings if they need to be updated") var m stringMap err := json.NewDecoder(r).Decode(&m) if err != nil { return err } policyMap := m.GetMap(policy) phases := policyMap.GetMap("policy").GetMap("phases") rollover := phases.GetMap("hot").GetMap("actions").GetMap("rollover") existingRolloverSize := rollover.GetString("max_size") existingRolloverAge := rollover.GetString("max_age") existingDeleteAge := phases.GetMap("delete").GetString("min_age") newRolloverSize := renderSize(ilmRolloverSize) newRolloverAge := renderDays(ilmRolloverAge) newDeleteAge := renderDays(ilmDeleteAge) if existingRolloverSize != newRolloverSize || existingRolloverAge != newRolloverAge || existingDeleteAge != newDeleteAge { lg.Info(). Str("old_rollover_size", existingRolloverSize). Str("new_rollover_size", newRolloverSize). Str("old_rollover_age", existingRolloverAge). Str("new_rollover_age", newRolloverAge). Str("old_delete_age", existingDeleteAge). Str("new_delete_age", newDeleteAge). Msgf("ILM policy settings were changed. Check if needs to be updated.") return createILMPolicy(ctx, cli, name) } return nil } func createILMPolicy(ctx context.Context, cli *elasticsearch.Client, name string) error { policy := GetILMPolicyName(name) body := mustRender(renderILMPolicy(ilmRolloverSize, ilmRolloverAge, ilmDeleteAge)) // The elastic will respond with an error if the ILM policy doesn't exists // in that case let's just create the ILM policy zerolog.Ctx(ctx).Debug().Str("policy", policy).Str("body", body).Msg("Creating ILM policy") res, err := cli.ILM.PutLifecycle(policy, cli.ILM.PutLifecycle.WithBody(strings.NewReader(body)), cli.ILM.PutLifecycle.WithContext(ctx), ) if err != nil { return err } defer res.Body.Close() return checkResponseError(res, zerolog.Ctx(ctx)) } func GetILMPolicyName(name string) string { return name + "-" + ilmPolicySuffix } func renderILMPolicy(rolloverSize, rolloverAge, deleteAge int) (s string, err error) { var m stringMap err = json.Unmarshal([]byte(defaultILMPolicy), &m) if err != nil { return s, err } phases := m.GetMap("policy").GetMap("phases") if rolloverAge != 0 || rolloverSize != 0 { hot := make(stringMap) rollover := make(stringMap) actions := make(stringMap) if rolloverSize != 0 { rollover["max_size"] = renderSize(rolloverSize) } if rolloverAge != 0 { rollover["max_age"] = renderDays(rolloverAge) } actions["rollover"] = rollover hot["actions"] = actions phases["hot"] = hot } if deleteAge != 0 { delete := make(stringMap) delete["min_age"] = renderDays(deleteAge) actions := make(stringMap) actions["delete"] = struct{}{} delete["actions"] = actions phases["delete"] = delete } b, err := json.Marshal(m) if err != nil { return s, err } return string(b), nil } func renderSize(size int) string { return strconv.Itoa(size) + "gb" } func renderDays(days int) string { return strconv.Itoa(days) + "d" } func mustRender(s string, err error) string { if err != nil { panic(err) } return s }